Line data Source code
1 : //! Simple pub-sub based on grpc (tonic) and Tokio broadcast channel for storage
2 : //! nodes messaging.
3 : //!
4 : //! Subscriptions to 1) single timeline 2) all timelines are possible. We could
5 : //! add subscription to the set of timelines to save grpc streams, but testing
6 : //! shows many individual streams is also ok.
7 : //!
8 : //! Message is dropped if subscriber can't consume it, not affecting other
9 : //! subscribers.
10 : //!
11 : //! Only safekeeper message is supported, but it is not hard to add something
12 : //! else with generics.
13 : use clap::{command, Parser};
14 : use futures_core::Stream;
15 : use futures_util::StreamExt;
16 : use http_body_util::Full;
17 : use hyper::body::Incoming;
18 : use hyper::header::CONTENT_TYPE;
19 : use hyper::service::service_fn;
20 : use hyper::{Method, StatusCode};
21 : use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
22 : use parking_lot::RwLock;
23 : use std::collections::HashMap;
24 : use std::convert::Infallible;
25 : use std::net::SocketAddr;
26 : use std::pin::Pin;
27 : use std::sync::Arc;
28 : use std::time::Duration;
29 : use tokio::net::TcpListener;
30 : use tokio::sync::broadcast;
31 : use tokio::sync::broadcast::error::RecvError;
32 : use tokio::time;
33 : use tonic::body::{self, empty_body, BoxBody};
34 : use tonic::codegen::Service;
35 : use tonic::Code;
36 : use tonic::{Request, Response, Status};
37 : use tracing::*;
38 : use utils::signals::ShutdownSignals;
39 :
40 : use metrics::{Encoder, TextEncoder};
41 : use storage_broker::metrics::{
42 : BROADCASTED_MESSAGES_TOTAL, BROADCAST_DROPPED_MESSAGES_TOTAL, NUM_PUBS, NUM_SUBS_ALL,
43 : NUM_SUBS_TIMELINE, PROCESSED_MESSAGES_TOTAL, PUBLISHED_ONEOFF_MESSAGES_TOTAL,
44 : };
45 : use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
46 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
47 : use storage_broker::proto::{
48 : FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
49 : SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
50 : };
51 : use storage_broker::{parse_proto_ttid, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR};
52 : use utils::id::TenantTimelineId;
53 : use utils::logging::{self, LogFormat};
54 : use utils::sentry_init::init_sentry;
55 : use utils::{project_build_tag, project_git_version};
56 :
57 : project_git_version!(GIT_VERSION);
58 : project_build_tag!(BUILD_TAG);
59 :
60 : const DEFAULT_CHAN_SIZE: usize = 32;
61 : const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
62 :
63 : #[derive(Parser, Debug)]
64 : #[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
65 : struct Args {
66 : /// Endpoint to listen on.
67 : #[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
68 0 : listen_addr: SocketAddr,
69 : /// Size of the queue to the per timeline subscriber.
70 0 : #[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
71 0 : timeline_chan_size: usize,
72 : /// Size of the queue to the all keys subscriber.
73 0 : #[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
74 0 : all_keys_chan_size: usize,
75 : /// HTTP/2 keepalive interval.
76 : #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
77 0 : http2_keepalive_interval: Duration,
78 : /// Format for logging, either 'plain' or 'json'.
79 : #[arg(long, default_value = "plain")]
80 0 : log_format: String,
81 : }
82 :
83 : /// Id of publisher for registering in maps
84 : type PubId = u64;
85 :
86 : /// Id of subscriber for registering in maps
87 : type SubId = u64;
88 :
89 : /// Single enum type for all messages.
90 : #[derive(Clone, Debug, PartialEq)]
91 : #[allow(clippy::enum_variant_names)]
92 : enum Message {
93 : SafekeeperTimelineInfo(SafekeeperTimelineInfo),
94 : SafekeeperDiscoveryRequest(SafekeeperDiscoveryRequest),
95 : SafekeeperDiscoveryResponse(SafekeeperDiscoveryResponse),
96 : }
97 :
98 : impl Message {
99 : /// Convert proto message to internal message.
100 0 : pub fn from(proto_msg: TypedMessage) -> Result<Self, Status> {
101 0 : match proto_msg.r#type() {
102 : MessageType::SafekeeperTimelineInfo => Ok(Message::SafekeeperTimelineInfo(
103 0 : proto_msg.safekeeper_timeline_info.ok_or_else(|| {
104 0 : Status::new(Code::InvalidArgument, "missing safekeeper_timeline_info")
105 0 : })?,
106 : )),
107 : MessageType::SafekeeperDiscoveryRequest => Ok(Message::SafekeeperDiscoveryRequest(
108 0 : proto_msg.safekeeper_discovery_request.ok_or_else(|| {
109 0 : Status::new(
110 0 : Code::InvalidArgument,
111 0 : "missing safekeeper_discovery_request",
112 0 : )
113 0 : })?,
114 : )),
115 : MessageType::SafekeeperDiscoveryResponse => Ok(Message::SafekeeperDiscoveryResponse(
116 0 : proto_msg.safekeeper_discovery_response.ok_or_else(|| {
117 0 : Status::new(
118 0 : Code::InvalidArgument,
119 0 : "missing safekeeper_discovery_response",
120 0 : )
121 0 : })?,
122 : )),
123 0 : MessageType::Unknown => Err(Status::new(
124 0 : Code::InvalidArgument,
125 0 : format!("invalid message type: {:?}", proto_msg.r#type),
126 0 : )),
127 : }
128 0 : }
129 :
130 : /// Get the tenant_timeline_id from the message.
131 2 : pub fn tenant_timeline_id(&self) -> Result<Option<TenantTimelineId>, Status> {
132 2 : match self {
133 2 : Message::SafekeeperTimelineInfo(msg) => Ok(msg
134 2 : .tenant_timeline_id
135 2 : .as_ref()
136 2 : .map(parse_proto_ttid)
137 2 : .transpose()?),
138 0 : Message::SafekeeperDiscoveryRequest(msg) => Ok(msg
139 0 : .tenant_timeline_id
140 0 : .as_ref()
141 0 : .map(parse_proto_ttid)
142 0 : .transpose()?),
143 0 : Message::SafekeeperDiscoveryResponse(msg) => Ok(msg
144 0 : .tenant_timeline_id
145 0 : .as_ref()
146 0 : .map(parse_proto_ttid)
147 0 : .transpose()?),
148 : }
149 2 : }
150 :
151 : /// Convert internal message to the protobuf struct.
152 0 : pub fn as_typed_message(&self) -> TypedMessage {
153 0 : let mut res = TypedMessage {
154 0 : r#type: self.message_type() as i32,
155 0 : ..Default::default()
156 0 : };
157 0 : match self {
158 0 : Message::SafekeeperTimelineInfo(msg) => {
159 0 : res.safekeeper_timeline_info = Some(msg.clone())
160 : }
161 0 : Message::SafekeeperDiscoveryRequest(msg) => {
162 0 : res.safekeeper_discovery_request = Some(msg.clone())
163 : }
164 0 : Message::SafekeeperDiscoveryResponse(msg) => {
165 0 : res.safekeeper_discovery_response = Some(msg.clone())
166 : }
167 : }
168 0 : res
169 0 : }
170 :
171 : /// Get the message type.
172 0 : pub fn message_type(&self) -> MessageType {
173 0 : match self {
174 0 : Message::SafekeeperTimelineInfo(_) => MessageType::SafekeeperTimelineInfo,
175 0 : Message::SafekeeperDiscoveryRequest(_) => MessageType::SafekeeperDiscoveryRequest,
176 0 : Message::SafekeeperDiscoveryResponse(_) => MessageType::SafekeeperDiscoveryResponse,
177 : }
178 0 : }
179 : }
180 :
181 : #[derive(Copy, Clone, Debug)]
182 : enum SubscriptionKey {
183 : All,
184 : Timeline(TenantTimelineId),
185 : }
186 :
187 : impl SubscriptionKey {
188 : /// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
189 0 : pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
190 0 : match key {
191 0 : ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
192 0 : ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => {
193 0 : Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?))
194 : }
195 : }
196 0 : }
197 :
198 : /// Parse from FilterTenantTimelineId
199 0 : pub fn from_proto_filter_tenant_timeline_id(
200 0 : opt: Option<&FilterTenantTimelineId>,
201 0 : ) -> Result<Self, Status> {
202 0 : if opt.is_none() {
203 0 : return Ok(SubscriptionKey::All);
204 0 : }
205 0 :
206 0 : let f = opt.unwrap();
207 0 : if !f.enabled {
208 0 : return Ok(SubscriptionKey::All);
209 0 : }
210 :
211 0 : let ttid =
212 0 : parse_proto_ttid(f.tenant_timeline_id.as_ref().ok_or_else(|| {
213 0 : Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
214 0 : })?)?;
215 0 : Ok(SubscriptionKey::Timeline(ttid))
216 0 : }
217 : }
218 :
219 : /// Channel to timeline subscribers.
220 : struct ChanToTimelineSub {
221 : chan: broadcast::Sender<Message>,
222 : /// Tracked separately to know when delete the shmem entry. receiver_count()
223 : /// is unhandy for that as unregistering and dropping the receiver side
224 : /// happens at different moments.
225 : num_subscribers: u64,
226 : }
227 :
228 : struct SharedState {
229 : next_pub_id: PubId,
230 : num_pubs: i64,
231 : next_sub_id: SubId,
232 : num_subs_to_timelines: i64,
233 : chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
234 : num_subs_to_all: i64,
235 : chan_to_all_subs: broadcast::Sender<Message>,
236 : }
237 :
238 : impl SharedState {
239 1 : pub fn new(all_keys_chan_size: usize) -> Self {
240 1 : SharedState {
241 1 : next_pub_id: 0,
242 1 : num_pubs: 0,
243 1 : next_sub_id: 0,
244 1 : num_subs_to_timelines: 0,
245 1 : chans_to_timeline_subs: HashMap::new(),
246 1 : num_subs_to_all: 0,
247 1 : chan_to_all_subs: broadcast::channel(all_keys_chan_size).0,
248 1 : }
249 1 : }
250 :
251 : // Register new publisher.
252 1 : pub fn register_publisher(&mut self) -> PubId {
253 1 : let pub_id = self.next_pub_id;
254 1 : self.next_pub_id += 1;
255 1 : self.num_pubs += 1;
256 1 : NUM_PUBS.set(self.num_pubs);
257 1 : pub_id
258 1 : }
259 :
260 : // Unregister publisher.
261 1 : pub fn unregister_publisher(&mut self) {
262 1 : self.num_pubs -= 1;
263 1 : NUM_PUBS.set(self.num_pubs);
264 1 : }
265 :
266 : // Register new subscriber.
267 2 : pub fn register_subscriber(
268 2 : &mut self,
269 2 : sub_key: SubscriptionKey,
270 2 : timeline_chan_size: usize,
271 2 : ) -> (SubId, broadcast::Receiver<Message>) {
272 2 : let sub_id = self.next_sub_id;
273 2 : self.next_sub_id += 1;
274 2 : let sub_rx = match sub_key {
275 : SubscriptionKey::All => {
276 1 : self.num_subs_to_all += 1;
277 1 : NUM_SUBS_ALL.set(self.num_subs_to_all);
278 1 : self.chan_to_all_subs.subscribe()
279 : }
280 1 : SubscriptionKey::Timeline(ttid) => {
281 1 : self.num_subs_to_timelines += 1;
282 1 : NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
283 1 : // Create new broadcast channel for this key, or subscriber to
284 1 : // the existing one.
285 1 : let chan_to_timeline_sub =
286 1 : self.chans_to_timeline_subs
287 1 : .entry(ttid)
288 1 : .or_insert(ChanToTimelineSub {
289 1 : chan: broadcast::channel(timeline_chan_size).0,
290 1 : num_subscribers: 0,
291 1 : });
292 1 : chan_to_timeline_sub.num_subscribers += 1;
293 1 : chan_to_timeline_sub.chan.subscribe()
294 : }
295 : };
296 2 : (sub_id, sub_rx)
297 2 : }
298 :
299 : // Unregister the subscriber.
300 2 : pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) {
301 2 : match sub_key {
302 1 : SubscriptionKey::All => {
303 1 : self.num_subs_to_all -= 1;
304 1 : NUM_SUBS_ALL.set(self.num_subs_to_all);
305 1 : }
306 1 : SubscriptionKey::Timeline(ttid) => {
307 1 : self.num_subs_to_timelines -= 1;
308 1 : NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
309 1 :
310 1 : // Remove from the map, destroying the channel, if we are the
311 1 : // last subscriber to this timeline.
312 1 :
313 1 : // Missing entry is a bug; we must have registered.
314 1 : let chan_to_timeline_sub = self
315 1 : .chans_to_timeline_subs
316 1 : .get_mut(&ttid)
317 1 : .expect("failed to find sub entry in shmem during unregister");
318 1 : chan_to_timeline_sub.num_subscribers -= 1;
319 1 : if chan_to_timeline_sub.num_subscribers == 0 {
320 1 : self.chans_to_timeline_subs.remove(&ttid);
321 1 : }
322 : }
323 : }
324 2 : }
325 : }
326 :
327 : // SharedState wrapper.
328 : #[derive(Clone)]
329 : struct Registry {
330 : shared_state: Arc<RwLock<SharedState>>,
331 : timeline_chan_size: usize,
332 : }
333 :
334 : impl Registry {
335 : // Register new publisher in shared state.
336 1 : pub fn register_publisher(&self, remote_addr: SocketAddr) -> Publisher {
337 1 : let pub_id = self.shared_state.write().register_publisher();
338 1 : info!("publication started id={} addr={:?}", pub_id, remote_addr);
339 1 : Publisher {
340 1 : id: pub_id,
341 1 : registry: self.clone(),
342 1 : remote_addr,
343 1 : }
344 1 : }
345 :
346 1 : pub fn unregister_publisher(&self, publisher: &Publisher) {
347 1 : self.shared_state.write().unregister_publisher();
348 1 : info!(
349 0 : "publication ended id={} addr={:?}",
350 : publisher.id, publisher.remote_addr
351 : );
352 1 : }
353 :
354 : // Register new subscriber in shared state.
355 2 : pub fn register_subscriber(
356 2 : &self,
357 2 : sub_key: SubscriptionKey,
358 2 : remote_addr: SocketAddr,
359 2 : ) -> Subscriber {
360 2 : let (sub_id, sub_rx) = self
361 2 : .shared_state
362 2 : .write()
363 2 : .register_subscriber(sub_key, self.timeline_chan_size);
364 2 : info!(
365 0 : "subscription started id={}, key={:?}, addr={:?}",
366 : sub_id, sub_key, remote_addr
367 : );
368 2 : Subscriber {
369 2 : id: sub_id,
370 2 : key: sub_key,
371 2 : sub_rx,
372 2 : registry: self.clone(),
373 2 : remote_addr,
374 2 : }
375 2 : }
376 :
377 : // Unregister the subscriber
378 2 : pub fn unregister_subscriber(&self, subscriber: &Subscriber) {
379 2 : self.shared_state
380 2 : .write()
381 2 : .unregister_subscriber(subscriber.key);
382 2 : info!(
383 0 : "subscription ended id={}, key={:?}, addr={:?}",
384 : subscriber.id, subscriber.key, subscriber.remote_addr
385 : );
386 2 : }
387 :
388 : /// Send msg to relevant subscribers.
389 2 : pub fn send_msg(&self, msg: &Message) -> Result<(), Status> {
390 2 : PROCESSED_MESSAGES_TOTAL.inc();
391 2 :
392 2 : // send message to subscribers for everything
393 2 : let shared_state = self.shared_state.read();
394 2 : // Err means there is no subscribers, it is fine.
395 2 : shared_state.chan_to_all_subs.send(msg.clone()).ok();
396 :
397 : // send message to per timeline subscribers, if there is ttid
398 2 : let ttid = msg.tenant_timeline_id()?;
399 2 : if let Some(ttid) = ttid {
400 2 : if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
401 1 : // Err can't happen here, as tx is destroyed only after removing
402 1 : // from the map the last subscriber along with tx.
403 1 : subs.chan
404 1 : .send(msg.clone())
405 1 : .expect("rx is still in the map with zero subscribers");
406 1 : }
407 0 : }
408 2 : Ok(())
409 2 : }
410 : }
411 :
412 : // Private subscriber state.
413 : struct Subscriber {
414 : id: SubId,
415 : key: SubscriptionKey,
416 : // Subscriber receives messages from publishers here.
417 : sub_rx: broadcast::Receiver<Message>,
418 : // to unregister itself from shared state in Drop
419 : registry: Registry,
420 : // for logging
421 : remote_addr: SocketAddr,
422 : }
423 :
424 : impl Drop for Subscriber {
425 2 : fn drop(&mut self) {
426 2 : self.registry.unregister_subscriber(self);
427 2 : }
428 : }
429 :
430 : // Private publisher state
431 : struct Publisher {
432 : id: PubId,
433 : registry: Registry,
434 : // for logging
435 : remote_addr: SocketAddr,
436 : }
437 :
438 : impl Publisher {
439 : /// Send msg to relevant subscribers.
440 2 : pub fn send_msg(&mut self, msg: &Message) -> Result<(), Status> {
441 2 : self.registry.send_msg(msg)
442 2 : }
443 : }
444 :
445 : impl Drop for Publisher {
446 1 : fn drop(&mut self) {
447 1 : self.registry.unregister_publisher(self);
448 1 : }
449 : }
450 :
451 : struct Broker {
452 : registry: Registry,
453 : }
454 :
455 : #[tonic::async_trait]
456 : impl BrokerService for Broker {
457 0 : async fn publish_safekeeper_info(
458 0 : &self,
459 0 : request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
460 0 : ) -> Result<Response<()>, Status> {
461 0 : let &RemoteAddr(remote_addr) = request
462 0 : .extensions()
463 0 : .get()
464 0 : .expect("RemoteAddr inserted by handler");
465 0 : let mut publisher = self.registry.register_publisher(remote_addr);
466 0 :
467 0 : let mut stream = request.into_inner();
468 :
469 : loop {
470 0 : match stream.next().await {
471 0 : Some(Ok(msg)) => publisher.send_msg(&Message::SafekeeperTimelineInfo(msg))?,
472 0 : Some(Err(e)) => return Err(e), // grpc error from the stream
473 0 : None => break, // closed stream
474 0 : }
475 0 : }
476 0 :
477 0 : Ok(Response::new(()))
478 0 : }
479 :
480 : type SubscribeSafekeeperInfoStream =
481 : Pin<Box<dyn Stream<Item = Result<SafekeeperTimelineInfo, Status>> + Send + 'static>>;
482 :
483 0 : async fn subscribe_safekeeper_info(
484 0 : &self,
485 0 : request: Request<SubscribeSafekeeperInfoRequest>,
486 0 : ) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
487 0 : let &RemoteAddr(remote_addr) = request
488 0 : .extensions()
489 0 : .get()
490 0 : .expect("RemoteAddr inserted by handler");
491 0 : let proto_key = request
492 0 : .into_inner()
493 0 : .subscription_key
494 0 : .ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?;
495 0 : let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
496 0 : let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
497 0 :
498 0 : // transform rx into stream with item = Result, as method result demands
499 0 : let output = async_stream::try_stream! {
500 0 : let mut warn_interval = time::interval(Duration::from_millis(1000));
501 0 : let mut missed_msgs: u64 = 0;
502 0 : loop {
503 0 : match subscriber.sub_rx.recv().await {
504 0 : Ok(info) => {
505 0 : match info {
506 0 : Message::SafekeeperTimelineInfo(info) => yield info,
507 0 : _ => {},
508 0 : }
509 0 : BROADCASTED_MESSAGES_TOTAL.inc();
510 0 : },
511 0 : Err(RecvError::Lagged(skipped_msg)) => {
512 0 : BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
513 0 : missed_msgs += skipped_msg;
514 0 : if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
515 0 : warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
516 0 : subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
517 0 : missed_msgs = 0;
518 0 : }
519 0 : }
520 0 : Err(RecvError::Closed) => {
521 0 : // can't happen, we never drop the channel while there is a subscriber
522 0 : Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
523 0 : }
524 0 : }
525 0 : }
526 0 : };
527 0 :
528 0 : Ok(Response::new(
529 0 : Box::pin(output) as Self::SubscribeSafekeeperInfoStream
530 0 : ))
531 0 : }
532 :
533 : type SubscribeByFilterStream =
534 : Pin<Box<dyn Stream<Item = Result<TypedMessage, Status>> + Send + 'static>>;
535 :
536 : /// Subscribe to all messages, limited by a filter.
537 0 : async fn subscribe_by_filter(
538 0 : &self,
539 0 : request: Request<SubscribeByFilterRequest>,
540 0 : ) -> std::result::Result<Response<Self::SubscribeByFilterStream>, Status> {
541 0 : let &RemoteAddr(remote_addr) = request
542 0 : .extensions()
543 0 : .get()
544 0 : .expect("RemoteAddr inserted by handler");
545 0 : let proto_filter = request.into_inner();
546 0 : let ttid_filter = proto_filter.tenant_timeline_id.as_ref();
547 :
548 0 : let sub_key = SubscriptionKey::from_proto_filter_tenant_timeline_id(ttid_filter)?;
549 0 : let types_set = proto_filter
550 0 : .types
551 0 : .iter()
552 0 : .map(|t| t.r#type)
553 0 : .collect::<std::collections::HashSet<_>>();
554 0 :
555 0 : let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
556 0 :
557 0 : // transform rx into stream with item = Result, as method result demands
558 0 : let output = async_stream::try_stream! {
559 0 : let mut warn_interval = time::interval(Duration::from_millis(1000));
560 0 : let mut missed_msgs: u64 = 0;
561 0 : loop {
562 0 : match subscriber.sub_rx.recv().await {
563 0 : Ok(msg) => {
564 0 : let msg_type = msg.message_type() as i32;
565 0 : if types_set.contains(&msg_type) {
566 0 : yield msg.as_typed_message();
567 0 : BROADCASTED_MESSAGES_TOTAL.inc();
568 0 : }
569 0 : },
570 0 : Err(RecvError::Lagged(skipped_msg)) => {
571 0 : BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
572 0 : missed_msgs += skipped_msg;
573 0 : if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
574 0 : warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
575 0 : subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
576 0 : missed_msgs = 0;
577 0 : }
578 0 : }
579 0 : Err(RecvError::Closed) => {
580 0 : // can't happen, we never drop the channel while there is a subscriber
581 0 : Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
582 0 : }
583 0 : }
584 0 : }
585 0 : };
586 0 :
587 0 : Ok(Response::new(
588 0 : Box::pin(output) as Self::SubscribeByFilterStream
589 0 : ))
590 0 : }
591 :
592 : /// Publish one message.
593 0 : async fn publish_one(
594 0 : &self,
595 0 : request: Request<TypedMessage>,
596 0 : ) -> std::result::Result<Response<()>, Status> {
597 0 : let msg = Message::from(request.into_inner())?;
598 0 : PUBLISHED_ONEOFF_MESSAGES_TOTAL.inc();
599 0 : self.registry.send_msg(&msg)?;
600 0 : Ok(Response::new(()))
601 0 : }
602 : }
603 :
604 : // We serve only metrics and healthcheck through http1.
605 0 : async fn http1_handler(
606 0 : req: hyper::Request<Incoming>,
607 0 : ) -> Result<hyper::Response<BoxBody>, Infallible> {
608 0 : let resp = match (req.method(), req.uri().path()) {
609 0 : (&Method::GET, "/metrics") => {
610 0 : let mut buffer = vec![];
611 0 : let metrics = metrics::gather();
612 0 : let encoder = TextEncoder::new();
613 0 : encoder.encode(&metrics, &mut buffer).unwrap();
614 0 :
615 0 : hyper::Response::builder()
616 0 : .status(StatusCode::OK)
617 0 : .header(CONTENT_TYPE, encoder.format_type())
618 0 : .body(body::boxed(Full::new(bytes::Bytes::from(buffer))))
619 0 : .unwrap()
620 : }
621 0 : (&Method::GET, "/status") => hyper::Response::builder()
622 0 : .status(StatusCode::OK)
623 0 : .body(empty_body())
624 0 : .unwrap(),
625 0 : _ => hyper::Response::builder()
626 0 : .status(StatusCode::NOT_FOUND)
627 0 : .body(empty_body())
628 0 : .unwrap(),
629 : };
630 0 : Ok(resp)
631 0 : }
632 :
633 : #[derive(Clone, Copy)]
634 : struct RemoteAddr(SocketAddr);
635 :
636 : #[tokio::main]
637 0 : async fn main() -> Result<(), Box<dyn std::error::Error>> {
638 0 : let args = Args::parse();
639 0 :
640 0 : // important to keep the order of:
641 0 : // 1. init logging
642 0 : // 2. tracing panic hook
643 0 : // 3. sentry
644 0 : logging::init(
645 0 : LogFormat::from_config(&args.log_format)?,
646 0 : logging::TracingErrorLayerEnablement::Disabled,
647 0 : logging::Output::Stdout,
648 0 : )?;
649 0 : logging::replace_panic_hook_with_tracing_panic_hook().forget();
650 0 : // initialize sentry if SENTRY_DSN is provided
651 0 : let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
652 0 : info!("version: {GIT_VERSION} build_tag: {BUILD_TAG}");
653 0 : metrics::set_build_info_metric(GIT_VERSION, BUILD_TAG);
654 0 :
655 0 : // On any shutdown signal, log receival and exit.
656 0 : std::thread::spawn(move || {
657 0 : ShutdownSignals::handle(|signal| {
658 0 : info!("received {}, terminating", signal.name());
659 0 : std::process::exit(0);
660 0 : })
661 0 : });
662 0 :
663 0 : let registry = Registry {
664 0 : shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))),
665 0 : timeline_chan_size: args.timeline_chan_size,
666 0 : };
667 0 : let storage_broker_impl = Broker {
668 0 : registry: registry.clone(),
669 0 : };
670 0 : let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
671 0 :
672 0 : // grpc is served along with http1 for metrics on a single port, hence we
673 0 : // don't use tonic's Server.
674 0 : let tcp_listener = TcpListener::bind(&args.listen_addr).await?;
675 0 : info!("listening on {}", &args.listen_addr);
676 0 : loop {
677 0 : let (stream, addr) = match tcp_listener.accept().await {
678 0 : Ok(v) => v,
679 0 : Err(e) => {
680 0 : info!("couldn't accept connection: {e}");
681 0 : continue;
682 0 : }
683 0 : };
684 0 :
685 0 : let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
686 0 : builder.http1().timer(TokioTimer::new());
687 0 : builder
688 0 : .http2()
689 0 : .timer(TokioTimer::new())
690 0 : .keep_alive_interval(Some(args.http2_keepalive_interval))
691 0 : // This matches the tonic server default. It allows us to support production-like workloads.
692 0 : .max_concurrent_streams(None);
693 0 :
694 0 : let storage_broker_server_cloned = storage_broker_server.clone();
695 0 : let remote_addr = RemoteAddr(addr);
696 0 : let service_fn_ = async move {
697 0 : service_fn(move |mut req| {
698 0 : // That's what tonic's MakeSvc.call does to pass conninfo to
699 0 : // the request handler (and where its request.remote_addr()
700 0 : // expects it to find).
701 0 : req.extensions_mut().insert(remote_addr);
702 0 :
703 0 : // Technically this second clone is not needed, but consume
704 0 : // by async block is apparently unavoidable. BTW, error
705 0 : // message is enigmatic, see
706 0 : // https://github.com/rust-lang/rust/issues/68119
707 0 : //
708 0 : // We could get away without async block at all, but then we
709 0 : // need to resort to futures::Either to merge the result,
710 0 : // which doesn't caress an eye as well.
711 0 : let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
712 0 : async move {
713 0 : if req.headers().get("content-type").map(|x| x.as_bytes())
714 0 : == Some(b"application/grpc")
715 0 : {
716 0 : let res_resp = storage_broker_server_svc.call(req).await;
717 0 : // Grpc and http1 handlers have slightly different
718 0 : // Response types: it is UnsyncBoxBody for the
719 0 : // former one (not sure why) and plain hyper::Body
720 0 : // for the latter. Both implement HttpBody though,
721 0 : // and `Either` is used to merge them.
722 0 : res_resp.map(|resp| resp.map(http_body_util::Either::Left))
723 0 : } else {
724 0 : let res_resp = http1_handler(req).await;
725 0 : res_resp.map(|resp| resp.map(http_body_util::Either::Right))
726 0 : }
727 0 : }
728 0 : })
729 0 : }
730 0 : .await;
731 0 :
732 0 : tokio::task::spawn(async move {
733 0 : let res = builder
734 0 : .serve_connection(TokioIo::new(stream), service_fn_)
735 0 : .await;
736 0 :
737 0 : if let Err(e) = res {
738 0 : info!("error serving connection from {addr}: {e}");
739 0 : }
740 0 : });
741 0 : }
742 0 : }
743 :
744 : #[cfg(test)]
745 : mod tests {
746 : use super::*;
747 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
748 : use tokio::sync::broadcast::error::TryRecvError;
749 : use utils::id::{TenantId, TimelineId};
750 :
751 2 : fn msg(timeline_id: Vec<u8>) -> Message {
752 2 : Message::SafekeeperTimelineInfo(SafekeeperTimelineInfo {
753 2 : safekeeper_id: 1,
754 2 : tenant_timeline_id: Some(ProtoTenantTimelineId {
755 2 : tenant_id: vec![0x00; 16],
756 2 : timeline_id,
757 2 : }),
758 2 : term: 0,
759 2 : last_log_term: 0,
760 2 : flush_lsn: 1,
761 2 : commit_lsn: 2,
762 2 : backup_lsn: 3,
763 2 : remote_consistent_lsn: 4,
764 2 : peer_horizon_lsn: 5,
765 2 : safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
766 2 : http_connstr: "neon-1-sk-1.local:7677".to_owned(),
767 2 : local_start_lsn: 0,
768 2 : availability_zone: None,
769 2 : standby_horizon: 0,
770 2 : })
771 2 : }
772 :
773 3 : fn tli_from_u64(i: u64) -> Vec<u8> {
774 3 : let mut timeline_id = vec![0xFF; 8];
775 3 : timeline_id.extend_from_slice(&i.to_be_bytes());
776 3 : timeline_id
777 3 : }
778 :
779 3 : fn mock_addr() -> SocketAddr {
780 3 : "127.0.0.1:8080".parse().unwrap()
781 3 : }
782 :
783 : #[tokio::test]
784 1 : async fn test_registry() {
785 1 : let registry = Registry {
786 1 : shared_state: Arc::new(RwLock::new(SharedState::new(16))),
787 1 : timeline_chan_size: 16,
788 1 : };
789 1 :
790 1 : // subscribe to timeline 2
791 1 : let ttid_2 = TenantTimelineId {
792 1 : tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
793 1 : timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(),
794 1 : };
795 1 : let sub_key_2 = SubscriptionKey::Timeline(ttid_2);
796 1 : let mut subscriber_2 = registry.register_subscriber(sub_key_2, mock_addr());
797 1 : let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All, mock_addr());
798 1 :
799 1 : // send two messages with different keys
800 1 : let msg_1 = msg(tli_from_u64(1));
801 1 : let msg_2 = msg(tli_from_u64(2));
802 1 : let mut publisher = registry.register_publisher(mock_addr());
803 1 : publisher.send_msg(&msg_1).expect("failed to send msg");
804 1 : publisher.send_msg(&msg_2).expect("failed to send msg");
805 1 :
806 1 : // msg with key 2 should arrive to subscriber_2
807 1 : assert_eq!(subscriber_2.sub_rx.try_recv().unwrap(), msg_2);
808 1 :
809 1 : // but nothing more
810 1 : assert_eq!(
811 1 : subscriber_2.sub_rx.try_recv().unwrap_err(),
812 1 : TryRecvError::Empty
813 1 : );
814 1 :
815 1 : // subscriber_all should receive both messages
816 1 : assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_1);
817 1 : assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_2);
818 1 : assert_eq!(
819 1 : subscriber_all.sub_rx.try_recv().unwrap_err(),
820 1 : TryRecvError::Empty
821 1 : );
822 1 : }
823 : }
|