Line data Source code
1 : //! Simple pub-sub based on grpc (tonic) and Tokio broadcast channel for storage
2 : //! nodes messaging.
3 : //!
4 : //! Subscriptions to 1) single timeline 2) all timelines are possible. We could
5 : //! add subscription to the set of timelines to save grpc streams, but testing
6 : //! shows many individual streams is also ok.
7 : //!
8 : //! Message is dropped if subscriber can't consume it, not affecting other
9 : //! subscribers.
10 : //!
11 : //! Only safekeeper message is supported, but it is not hard to add something
12 : //! else with generics.
13 : use clap::{command, Parser};
14 : use futures_core::Stream;
15 : use futures_util::StreamExt;
16 : use hyper::header::CONTENT_TYPE;
17 : use hyper::server::conn::AddrStream;
18 : use hyper::service::{make_service_fn, service_fn};
19 : use hyper::{Body, Method, StatusCode};
20 : use parking_lot::RwLock;
21 : use std::collections::HashMap;
22 : use std::convert::Infallible;
23 : use std::net::SocketAddr;
24 : use std::pin::Pin;
25 : use std::sync::Arc;
26 : use std::time::Duration;
27 : use tokio::sync::broadcast;
28 : use tokio::sync::broadcast::error::RecvError;
29 : use tokio::time;
30 : use tonic::codegen::Service;
31 : use tonic::transport::server::Connected;
32 : use tonic::Code;
33 : use tonic::{Request, Response, Status};
34 : use tracing::*;
35 : use utils::signals::ShutdownSignals;
36 :
37 : use metrics::{Encoder, TextEncoder};
38 : use storage_broker::metrics::{
39 : BROADCASTED_MESSAGES_TOTAL, BROADCAST_DROPPED_MESSAGES_TOTAL, NUM_PUBS, NUM_SUBS_ALL,
40 : NUM_SUBS_TIMELINE, PROCESSED_MESSAGES_TOTAL, PUBLISHED_ONEOFF_MESSAGES_TOTAL,
41 : };
42 : use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
43 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
44 : use storage_broker::proto::{
45 : FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
46 : SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
47 : };
48 : use storage_broker::{
49 : parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
50 : };
51 : use utils::id::TenantTimelineId;
52 : use utils::logging::{self, LogFormat};
53 : use utils::sentry_init::init_sentry;
54 : use utils::{project_build_tag, project_git_version};
55 :
56 : project_git_version!(GIT_VERSION);
57 : project_build_tag!(BUILD_TAG);
58 :
59 : const DEFAULT_CHAN_SIZE: usize = 32;
60 : const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
61 :
62 0 : #[derive(Parser, Debug)]
63 : #[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
64 : struct Args {
65 : /// Endpoint to listen on.
66 : #[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
67 0 : listen_addr: SocketAddr,
68 : /// Size of the queue to the per timeline subscriber.
69 0 : #[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
70 0 : timeline_chan_size: usize,
71 : /// Size of the queue to the all keys subscriber.
72 0 : #[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
73 0 : all_keys_chan_size: usize,
74 : /// HTTP/2 keepalive interval.
75 : #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
76 0 : http2_keepalive_interval: Duration,
77 : /// Format for logging, either 'plain' or 'json'.
78 : #[arg(long, default_value = "plain")]
79 0 : log_format: String,
80 : }
81 :
82 : /// Id of publisher for registering in maps
83 : type PubId = u64;
84 :
85 : /// Id of subscriber for registering in maps
86 : type SubId = u64;
87 :
88 : /// Single enum type for all messages.
89 12 : #[derive(Clone, Debug, PartialEq)]
90 : #[allow(clippy::enum_variant_names)]
91 : enum Message {
92 : SafekeeperTimelineInfo(SafekeeperTimelineInfo),
93 : SafekeeperDiscoveryRequest(SafekeeperDiscoveryRequest),
94 : SafekeeperDiscoveryResponse(SafekeeperDiscoveryResponse),
95 : }
96 :
97 : impl Message {
98 : /// Convert proto message to internal message.
99 0 : pub fn from(proto_msg: TypedMessage) -> Result<Self, Status> {
100 0 : match proto_msg.r#type() {
101 : MessageType::SafekeeperTimelineInfo => Ok(Message::SafekeeperTimelineInfo(
102 0 : proto_msg.safekeeper_timeline_info.ok_or_else(|| {
103 0 : Status::new(Code::InvalidArgument, "missing safekeeper_timeline_info")
104 0 : })?,
105 : )),
106 : MessageType::SafekeeperDiscoveryRequest => Ok(Message::SafekeeperDiscoveryRequest(
107 0 : proto_msg.safekeeper_discovery_request.ok_or_else(|| {
108 0 : Status::new(
109 0 : Code::InvalidArgument,
110 0 : "missing safekeeper_discovery_request",
111 0 : )
112 0 : })?,
113 : )),
114 : MessageType::SafekeeperDiscoveryResponse => Ok(Message::SafekeeperDiscoveryResponse(
115 0 : proto_msg.safekeeper_discovery_response.ok_or_else(|| {
116 0 : Status::new(
117 0 : Code::InvalidArgument,
118 0 : "missing safekeeper_discovery_response",
119 0 : )
120 0 : })?,
121 : )),
122 0 : MessageType::Unknown => Err(Status::new(
123 0 : Code::InvalidArgument,
124 0 : format!("invalid message type: {:?}", proto_msg.r#type),
125 0 : )),
126 : }
127 0 : }
128 :
129 : /// Get the tenant_timeline_id from the message.
130 4 : pub fn tenant_timeline_id(&self) -> Result<Option<TenantTimelineId>, Status> {
131 4 : match self {
132 4 : Message::SafekeeperTimelineInfo(msg) => Ok(msg
133 4 : .tenant_timeline_id
134 4 : .as_ref()
135 4 : .map(parse_proto_ttid)
136 4 : .transpose()?),
137 0 : Message::SafekeeperDiscoveryRequest(msg) => Ok(msg
138 0 : .tenant_timeline_id
139 0 : .as_ref()
140 0 : .map(parse_proto_ttid)
141 0 : .transpose()?),
142 0 : Message::SafekeeperDiscoveryResponse(msg) => Ok(msg
143 0 : .tenant_timeline_id
144 0 : .as_ref()
145 0 : .map(parse_proto_ttid)
146 0 : .transpose()?),
147 : }
148 4 : }
149 :
150 : /// Convert internal message to the protobuf struct.
151 0 : pub fn as_typed_message(&self) -> TypedMessage {
152 0 : let mut res = TypedMessage {
153 0 : r#type: self.message_type() as i32,
154 0 : ..Default::default()
155 0 : };
156 0 : match self {
157 0 : Message::SafekeeperTimelineInfo(msg) => {
158 0 : res.safekeeper_timeline_info = Some(msg.clone())
159 : }
160 0 : Message::SafekeeperDiscoveryRequest(msg) => {
161 0 : res.safekeeper_discovery_request = Some(msg.clone())
162 : }
163 0 : Message::SafekeeperDiscoveryResponse(msg) => {
164 0 : res.safekeeper_discovery_response = Some(msg.clone())
165 : }
166 : }
167 0 : res
168 0 : }
169 :
170 : /// Get the message type.
171 0 : pub fn message_type(&self) -> MessageType {
172 0 : match self {
173 0 : Message::SafekeeperTimelineInfo(_) => MessageType::SafekeeperTimelineInfo,
174 0 : Message::SafekeeperDiscoveryRequest(_) => MessageType::SafekeeperDiscoveryRequest,
175 0 : Message::SafekeeperDiscoveryResponse(_) => MessageType::SafekeeperDiscoveryResponse,
176 : }
177 0 : }
178 : }
179 :
180 0 : #[derive(Copy, Clone, Debug)]
181 : enum SubscriptionKey {
182 : All,
183 : Timeline(TenantTimelineId),
184 : }
185 :
186 : impl SubscriptionKey {
187 : /// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
188 0 : pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
189 0 : match key {
190 0 : ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
191 0 : ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => {
192 0 : Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?))
193 : }
194 : }
195 0 : }
196 :
197 : /// Parse from FilterTenantTimelineId
198 0 : pub fn from_proto_filter_tenant_timeline_id(
199 0 : f: &FilterTenantTimelineId,
200 0 : ) -> Result<Self, Status> {
201 0 : if !f.enabled {
202 0 : return Ok(SubscriptionKey::All);
203 0 : }
204 :
205 0 : let ttid =
206 0 : parse_proto_ttid(f.tenant_timeline_id.as_ref().ok_or_else(|| {
207 0 : Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
208 0 : })?)?;
209 0 : Ok(SubscriptionKey::Timeline(ttid))
210 0 : }
211 : }
212 :
213 : /// Channel to timeline subscribers.
214 : struct ChanToTimelineSub {
215 : chan: broadcast::Sender<Message>,
216 : /// Tracked separately to know when delete the shmem entry. receiver_count()
217 : /// is unhandy for that as unregistering and dropping the receiver side
218 : /// happens at different moments.
219 : num_subscribers: u64,
220 : }
221 :
222 : struct SharedState {
223 : next_pub_id: PubId,
224 : num_pubs: i64,
225 : next_sub_id: SubId,
226 : num_subs_to_timelines: i64,
227 : chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
228 : num_subs_to_all: i64,
229 : chan_to_all_subs: broadcast::Sender<Message>,
230 : }
231 :
232 : impl SharedState {
233 2 : pub fn new(all_keys_chan_size: usize) -> Self {
234 2 : SharedState {
235 2 : next_pub_id: 0,
236 2 : num_pubs: 0,
237 2 : next_sub_id: 0,
238 2 : num_subs_to_timelines: 0,
239 2 : chans_to_timeline_subs: HashMap::new(),
240 2 : num_subs_to_all: 0,
241 2 : chan_to_all_subs: broadcast::channel(all_keys_chan_size).0,
242 2 : }
243 2 : }
244 :
245 : // Register new publisher.
246 2 : pub fn register_publisher(&mut self) -> PubId {
247 2 : let pub_id = self.next_pub_id;
248 2 : self.next_pub_id += 1;
249 2 : self.num_pubs += 1;
250 2 : NUM_PUBS.set(self.num_pubs);
251 2 : pub_id
252 2 : }
253 :
254 : // Unregister publisher.
255 2 : pub fn unregister_publisher(&mut self) {
256 2 : self.num_pubs -= 1;
257 2 : NUM_PUBS.set(self.num_pubs);
258 2 : }
259 :
260 : // Register new subscriber.
261 4 : pub fn register_subscriber(
262 4 : &mut self,
263 4 : sub_key: SubscriptionKey,
264 4 : timeline_chan_size: usize,
265 4 : ) -> (SubId, broadcast::Receiver<Message>) {
266 4 : let sub_id = self.next_sub_id;
267 4 : self.next_sub_id += 1;
268 4 : let sub_rx = match sub_key {
269 : SubscriptionKey::All => {
270 2 : self.num_subs_to_all += 1;
271 2 : NUM_SUBS_ALL.set(self.num_subs_to_all);
272 2 : self.chan_to_all_subs.subscribe()
273 : }
274 2 : SubscriptionKey::Timeline(ttid) => {
275 2 : self.num_subs_to_timelines += 1;
276 2 : NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
277 2 : // Create new broadcast channel for this key, or subscriber to
278 2 : // the existing one.
279 2 : let chan_to_timeline_sub =
280 2 : self.chans_to_timeline_subs
281 2 : .entry(ttid)
282 2 : .or_insert(ChanToTimelineSub {
283 2 : chan: broadcast::channel(timeline_chan_size).0,
284 2 : num_subscribers: 0,
285 2 : });
286 2 : chan_to_timeline_sub.num_subscribers += 1;
287 2 : chan_to_timeline_sub.chan.subscribe()
288 : }
289 : };
290 4 : (sub_id, sub_rx)
291 4 : }
292 :
293 : // Unregister the subscriber.
294 4 : pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) {
295 4 : match sub_key {
296 2 : SubscriptionKey::All => {
297 2 : self.num_subs_to_all -= 1;
298 2 : NUM_SUBS_ALL.set(self.num_subs_to_all);
299 2 : }
300 2 : SubscriptionKey::Timeline(ttid) => {
301 2 : self.num_subs_to_timelines -= 1;
302 2 : NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
303 2 :
304 2 : // Remove from the map, destroying the channel, if we are the
305 2 : // last subscriber to this timeline.
306 2 :
307 2 : // Missing entry is a bug; we must have registered.
308 2 : let chan_to_timeline_sub = self
309 2 : .chans_to_timeline_subs
310 2 : .get_mut(&ttid)
311 2 : .expect("failed to find sub entry in shmem during unregister");
312 2 : chan_to_timeline_sub.num_subscribers -= 1;
313 2 : if chan_to_timeline_sub.num_subscribers == 0 {
314 2 : self.chans_to_timeline_subs.remove(&ttid);
315 2 : }
316 : }
317 : }
318 4 : }
319 : }
320 :
321 : // SharedState wrapper.
322 6 : #[derive(Clone)]
323 : struct Registry {
324 : shared_state: Arc<RwLock<SharedState>>,
325 : timeline_chan_size: usize,
326 : }
327 :
328 : impl Registry {
329 : // Register new publisher in shared state.
330 2 : pub fn register_publisher(&self, remote_addr: SocketAddr) -> Publisher {
331 2 : let pub_id = self.shared_state.write().register_publisher();
332 2 : info!("publication started id={} addr={:?}", pub_id, remote_addr);
333 2 : Publisher {
334 2 : id: pub_id,
335 2 : registry: self.clone(),
336 2 : remote_addr,
337 2 : }
338 2 : }
339 :
340 2 : pub fn unregister_publisher(&self, publisher: &Publisher) {
341 2 : self.shared_state.write().unregister_publisher();
342 2 : info!(
343 0 : "publication ended id={} addr={:?}",
344 0 : publisher.id, publisher.remote_addr
345 0 : );
346 2 : }
347 :
348 : // Register new subscriber in shared state.
349 4 : pub fn register_subscriber(
350 4 : &self,
351 4 : sub_key: SubscriptionKey,
352 4 : remote_addr: SocketAddr,
353 4 : ) -> Subscriber {
354 4 : let (sub_id, sub_rx) = self
355 4 : .shared_state
356 4 : .write()
357 4 : .register_subscriber(sub_key, self.timeline_chan_size);
358 4 : info!(
359 0 : "subscription started id={}, key={:?}, addr={:?}",
360 0 : sub_id, sub_key, remote_addr
361 0 : );
362 4 : Subscriber {
363 4 : id: sub_id,
364 4 : key: sub_key,
365 4 : sub_rx,
366 4 : registry: self.clone(),
367 4 : remote_addr,
368 4 : }
369 4 : }
370 :
371 : // Unregister the subscriber
372 4 : pub fn unregister_subscriber(&self, subscriber: &Subscriber) {
373 4 : self.shared_state
374 4 : .write()
375 4 : .unregister_subscriber(subscriber.key);
376 4 : info!(
377 0 : "subscription ended id={}, key={:?}, addr={:?}",
378 0 : subscriber.id, subscriber.key, subscriber.remote_addr
379 0 : );
380 4 : }
381 :
382 : /// Send msg to relevant subscribers.
383 4 : pub fn send_msg(&self, msg: &Message) -> Result<(), Status> {
384 4 : PROCESSED_MESSAGES_TOTAL.inc();
385 4 :
386 4 : // send message to subscribers for everything
387 4 : let shared_state = self.shared_state.read();
388 4 : // Err means there is no subscribers, it is fine.
389 4 : shared_state.chan_to_all_subs.send(msg.clone()).ok();
390 :
391 : // send message to per timeline subscribers, if there is ttid
392 4 : let ttid = msg.tenant_timeline_id()?;
393 4 : if let Some(ttid) = ttid {
394 4 : if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
395 2 : // Err can't happen here, as tx is destroyed only after removing
396 2 : // from the map the last subscriber along with tx.
397 2 : subs.chan
398 2 : .send(msg.clone())
399 2 : .expect("rx is still in the map with zero subscribers");
400 2 : }
401 0 : }
402 4 : Ok(())
403 4 : }
404 : }
405 :
406 : // Private subscriber state.
407 : struct Subscriber {
408 : id: SubId,
409 : key: SubscriptionKey,
410 : // Subscriber receives messages from publishers here.
411 : sub_rx: broadcast::Receiver<Message>,
412 : // to unregister itself from shared state in Drop
413 : registry: Registry,
414 : // for logging
415 : remote_addr: SocketAddr,
416 : }
417 :
418 : impl Drop for Subscriber {
419 4 : fn drop(&mut self) {
420 4 : self.registry.unregister_subscriber(self);
421 4 : }
422 : }
423 :
424 : // Private publisher state
425 : struct Publisher {
426 : id: PubId,
427 : registry: Registry,
428 : // for logging
429 : remote_addr: SocketAddr,
430 : }
431 :
432 : impl Publisher {
433 : /// Send msg to relevant subscribers.
434 4 : pub fn send_msg(&mut self, msg: &Message) -> Result<(), Status> {
435 4 : self.registry.send_msg(msg)
436 4 : }
437 : }
438 :
439 : impl Drop for Publisher {
440 2 : fn drop(&mut self) {
441 2 : self.registry.unregister_publisher(self);
442 2 : }
443 : }
444 :
445 : struct Broker {
446 : registry: Registry,
447 : }
448 :
449 : #[tonic::async_trait]
450 : impl BrokerService for Broker {
451 0 : async fn publish_safekeeper_info(
452 0 : &self,
453 0 : request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
454 0 : ) -> Result<Response<()>, Status> {
455 0 : let remote_addr = request
456 0 : .remote_addr()
457 0 : .expect("TCPConnectInfo inserted by handler");
458 0 : let mut publisher = self.registry.register_publisher(remote_addr);
459 0 :
460 0 : let mut stream = request.into_inner();
461 :
462 : loop {
463 0 : match stream.next().await {
464 0 : Some(Ok(msg)) => publisher.send_msg(&Message::SafekeeperTimelineInfo(msg))?,
465 0 : Some(Err(e)) => return Err(e), // grpc error from the stream
466 0 : None => break, // closed stream
467 0 : }
468 0 : }
469 0 :
470 0 : Ok(Response::new(()))
471 0 : }
472 :
473 : type SubscribeSafekeeperInfoStream =
474 : Pin<Box<dyn Stream<Item = Result<SafekeeperTimelineInfo, Status>> + Send + 'static>>;
475 :
476 0 : async fn subscribe_safekeeper_info(
477 0 : &self,
478 0 : request: Request<SubscribeSafekeeperInfoRequest>,
479 0 : ) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
480 0 : let remote_addr = request
481 0 : .remote_addr()
482 0 : .expect("TCPConnectInfo inserted by handler");
483 0 : let proto_key = request
484 0 : .into_inner()
485 0 : .subscription_key
486 0 : .ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?;
487 0 : let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
488 0 : let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
489 0 :
490 0 : // transform rx into stream with item = Result, as method result demands
491 0 : let output = async_stream::try_stream! {
492 0 : let mut warn_interval = time::interval(Duration::from_millis(1000));
493 0 : let mut missed_msgs: u64 = 0;
494 : loop {
495 0 : match subscriber.sub_rx.recv().await {
496 0 : Ok(info) => {
497 0 : match info {
498 0 : Message::SafekeeperTimelineInfo(info) => yield info,
499 0 : _ => {},
500 : }
501 0 : BROADCASTED_MESSAGES_TOTAL.inc();
502 : },
503 0 : Err(RecvError::Lagged(skipped_msg)) => {
504 0 : BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
505 0 : missed_msgs += skipped_msg;
506 0 : if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
507 0 : warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
508 0 : subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
509 0 : missed_msgs = 0;
510 0 : }
511 : }
512 : Err(RecvError::Closed) => {
513 : // can't happen, we never drop the channel while there is a subscriber
514 0 : Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
515 : }
516 : }
517 : }
518 : };
519 :
520 0 : Ok(Response::new(
521 0 : Box::pin(output) as Self::SubscribeSafekeeperInfoStream
522 0 : ))
523 0 : }
524 :
525 : type SubscribeByFilterStream =
526 : Pin<Box<dyn Stream<Item = Result<TypedMessage, Status>> + Send + 'static>>;
527 :
528 : /// Subscribe to all messages, limited by a filter.
529 0 : async fn subscribe_by_filter(
530 0 : &self,
531 0 : request: Request<SubscribeByFilterRequest>,
532 0 : ) -> std::result::Result<Response<Self::SubscribeByFilterStream>, Status> {
533 0 : let remote_addr = request
534 0 : .remote_addr()
535 0 : .expect("TCPConnectInfo inserted by handler");
536 0 : let proto_filter = request.into_inner();
537 0 : let ttid_filter = proto_filter
538 0 : .tenant_timeline_id
539 0 : .as_ref()
540 0 : .ok_or_else(|| Status::new(Code::InvalidArgument, "missing tenant_timeline_id"))?;
541 :
542 0 : let sub_key = SubscriptionKey::from_proto_filter_tenant_timeline_id(ttid_filter)?;
543 0 : let types_set = proto_filter
544 0 : .types
545 0 : .iter()
546 0 : .map(|t| t.r#type)
547 0 : .collect::<std::collections::HashSet<_>>();
548 0 :
549 0 : let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
550 0 :
551 0 : // transform rx into stream with item = Result, as method result demands
552 0 : let output = async_stream::try_stream! {
553 0 : let mut warn_interval = time::interval(Duration::from_millis(1000));
554 0 : let mut missed_msgs: u64 = 0;
555 : loop {
556 0 : match subscriber.sub_rx.recv().await {
557 0 : Ok(msg) => {
558 0 : let msg_type = msg.message_type() as i32;
559 0 : if types_set.contains(&msg_type) {
560 0 : yield msg.as_typed_message();
561 0 : BROADCASTED_MESSAGES_TOTAL.inc();
562 0 : }
563 : },
564 0 : Err(RecvError::Lagged(skipped_msg)) => {
565 0 : BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
566 0 : missed_msgs += skipped_msg;
567 0 : if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
568 0 : warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
569 0 : subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
570 0 : missed_msgs = 0;
571 0 : }
572 : }
573 : Err(RecvError::Closed) => {
574 : // can't happen, we never drop the channel while there is a subscriber
575 0 : Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
576 : }
577 : }
578 : }
579 : };
580 :
581 0 : Ok(Response::new(
582 0 : Box::pin(output) as Self::SubscribeByFilterStream
583 0 : ))
584 0 : }
585 :
586 : /// Publish one message.
587 0 : async fn publish_one(
588 0 : &self,
589 0 : request: Request<TypedMessage>,
590 0 : ) -> std::result::Result<Response<()>, Status> {
591 0 : let msg = Message::from(request.into_inner())?;
592 0 : PUBLISHED_ONEOFF_MESSAGES_TOTAL.inc();
593 0 : self.registry.send_msg(&msg)?;
594 0 : Ok(Response::new(()))
595 0 : }
596 : }
597 :
598 : // We serve only metrics and healthcheck through http1.
599 0 : async fn http1_handler(
600 0 : req: hyper::Request<hyper::body::Body>,
601 0 : ) -> Result<hyper::Response<Body>, Infallible> {
602 0 : let resp = match (req.method(), req.uri().path()) {
603 0 : (&Method::GET, "/metrics") => {
604 0 : let mut buffer = vec![];
605 0 : let metrics = metrics::gather();
606 0 : let encoder = TextEncoder::new();
607 0 : encoder.encode(&metrics, &mut buffer).unwrap();
608 0 :
609 0 : hyper::Response::builder()
610 0 : .status(StatusCode::OK)
611 0 : .header(CONTENT_TYPE, encoder.format_type())
612 0 : .body(Body::from(buffer))
613 0 : .unwrap()
614 : }
615 0 : (&Method::GET, "/status") => hyper::Response::builder()
616 0 : .status(StatusCode::OK)
617 0 : .body(Body::empty())
618 0 : .unwrap(),
619 0 : _ => hyper::Response::builder()
620 0 : .status(StatusCode::NOT_FOUND)
621 0 : .body(Body::empty())
622 0 : .unwrap(),
623 : };
624 0 : Ok(resp)
625 0 : }
626 :
627 : #[tokio::main]
628 0 : async fn main() -> Result<(), Box<dyn std::error::Error>> {
629 0 : let args = Args::parse();
630 0 :
631 0 : // important to keep the order of:
632 0 : // 1. init logging
633 0 : // 2. tracing panic hook
634 0 : // 3. sentry
635 0 : logging::init(
636 0 : LogFormat::from_config(&args.log_format)?,
637 0 : logging::TracingErrorLayerEnablement::Disabled,
638 0 : logging::Output::Stdout,
639 0 : )?;
640 0 : logging::replace_panic_hook_with_tracing_panic_hook().forget();
641 0 : // initialize sentry if SENTRY_DSN is provided
642 0 : let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
643 0 : info!("version: {GIT_VERSION}");
644 0 : info!("build_tag: {BUILD_TAG}");
645 0 : metrics::set_build_info_metric(GIT_VERSION, BUILD_TAG);
646 0 :
647 0 : // On any shutdown signal, log receival and exit.
648 0 : std::thread::spawn(move || {
649 0 : ShutdownSignals::handle(|signal| {
650 0 : info!("received {}, terminating", signal.name());
651 0 : std::process::exit(0);
652 0 : })
653 0 : });
654 0 :
655 0 : let registry = Registry {
656 0 : shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))),
657 0 : timeline_chan_size: args.timeline_chan_size,
658 0 : };
659 0 : let storage_broker_impl = Broker {
660 0 : registry: registry.clone(),
661 0 : };
662 0 : let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
663 0 :
664 0 : info!("listening on {}", &args.listen_addr);
665 0 :
666 0 : // grpc is served along with http1 for metrics on a single port, hence we
667 0 : // don't use tonic's Server.
668 0 : hyper::Server::bind(&args.listen_addr)
669 0 : .http2_keep_alive_interval(Some(args.http2_keepalive_interval))
670 0 : .serve(make_service_fn(move |conn: &AddrStream| {
671 0 : let storage_broker_server_cloned = storage_broker_server.clone();
672 0 : let connect_info = conn.connect_info();
673 0 : async move {
674 0 : Ok::<_, Infallible>(service_fn(move |mut req| {
675 0 : // That's what tonic's MakeSvc.call does to pass conninfo to
676 0 : // the request handler (and where its request.remote_addr()
677 0 : // expects it to find).
678 0 : req.extensions_mut().insert(connect_info.clone());
679 0 :
680 0 : // Technically this second clone is not needed, but consume
681 0 : // by async block is apparently unavoidable. BTW, error
682 0 : // message is enigmatic, see
683 0 : // https://github.com/rust-lang/rust/issues/68119
684 0 : //
685 0 : // We could get away without async block at all, but then we
686 0 : // need to resort to futures::Either to merge the result,
687 0 : // which doesn't caress an eye as well.
688 0 : let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
689 0 : async move {
690 0 : if req.headers().get("content-type").map(|x| x.as_bytes())
691 0 : == Some(b"application/grpc")
692 0 : {
693 0 : let res_resp = storage_broker_server_svc.call(req).await;
694 0 : // Grpc and http1 handlers have slightly different
695 0 : // Response types: it is UnsyncBoxBody for the
696 0 : // former one (not sure why) and plain hyper::Body
697 0 : // for the latter. Both implement HttpBody though,
698 0 : // and EitherBody is used to merge them.
699 0 : res_resp.map(|resp| resp.map(EitherBody::Left))
700 0 : } else {
701 0 : let res_resp = http1_handler(req).await;
702 0 : res_resp.map(|resp| resp.map(EitherBody::Right))
703 0 : }
704 0 : }
705 0 : }))
706 0 : }
707 0 : }))
708 0 : .await?;
709 0 : Ok(())
710 0 : }
711 :
712 : #[cfg(test)]
713 : mod tests {
714 : use super::*;
715 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
716 : use tokio::sync::broadcast::error::TryRecvError;
717 : use utils::id::{TenantId, TimelineId};
718 :
719 4 : fn msg(timeline_id: Vec<u8>) -> Message {
720 4 : Message::SafekeeperTimelineInfo(SafekeeperTimelineInfo {
721 4 : safekeeper_id: 1,
722 4 : tenant_timeline_id: Some(ProtoTenantTimelineId {
723 4 : tenant_id: vec![0x00; 16],
724 4 : timeline_id,
725 4 : }),
726 4 : term: 0,
727 4 : last_log_term: 0,
728 4 : flush_lsn: 1,
729 4 : commit_lsn: 2,
730 4 : backup_lsn: 3,
731 4 : remote_consistent_lsn: 4,
732 4 : peer_horizon_lsn: 5,
733 4 : safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
734 4 : http_connstr: "neon-1-sk-1.local:7677".to_owned(),
735 4 : local_start_lsn: 0,
736 4 : availability_zone: None,
737 4 : })
738 4 : }
739 :
740 6 : fn tli_from_u64(i: u64) -> Vec<u8> {
741 6 : let mut timeline_id = vec![0xFF; 8];
742 6 : timeline_id.extend_from_slice(&i.to_be_bytes());
743 6 : timeline_id
744 6 : }
745 :
746 6 : fn mock_addr() -> SocketAddr {
747 6 : "127.0.0.1:8080".parse().unwrap()
748 6 : }
749 :
750 2 : #[tokio::test]
751 2 : async fn test_registry() {
752 2 : let registry = Registry {
753 2 : shared_state: Arc::new(RwLock::new(SharedState::new(16))),
754 2 : timeline_chan_size: 16,
755 2 : };
756 2 :
757 2 : // subscribe to timeline 2
758 2 : let ttid_2 = TenantTimelineId {
759 2 : tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
760 2 : timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(),
761 2 : };
762 2 : let sub_key_2 = SubscriptionKey::Timeline(ttid_2);
763 2 : let mut subscriber_2 = registry.register_subscriber(sub_key_2, mock_addr());
764 2 : let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All, mock_addr());
765 2 :
766 2 : // send two messages with different keys
767 2 : let msg_1 = msg(tli_from_u64(1));
768 2 : let msg_2 = msg(tli_from_u64(2));
769 2 : let mut publisher = registry.register_publisher(mock_addr());
770 2 : publisher.send_msg(&msg_1).expect("failed to send msg");
771 2 : publisher.send_msg(&msg_2).expect("failed to send msg");
772 2 :
773 2 : // msg with key 2 should arrive to subscriber_2
774 2 : assert_eq!(subscriber_2.sub_rx.try_recv().unwrap(), msg_2);
775 2 :
776 2 : // but nothing more
777 2 : assert_eq!(
778 2 : subscriber_2.sub_rx.try_recv().unwrap_err(),
779 2 : TryRecvError::Empty
780 2 : );
781 2 :
782 2 : // subscriber_all should receive both messages
783 2 : assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_1);
784 2 : assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_2);
785 2 : assert_eq!(
786 2 : subscriber_all.sub_rx.try_recv().unwrap_err(),
787 2 : TryRecvError::Empty
788 2 : );
789 2 : }
790 : }
|