Line data Source code
1 : //! Simple pub-sub based on grpc (tonic) and Tokio broadcast channel for storage
2 : //! nodes messaging.
3 : //!
4 : //! Subscriptions to 1) single timeline 2) all timelines are possible. We could
5 : //! add subscription to the set of timelines to save grpc streams, but testing
6 : //! shows many individual streams is also ok.
7 : //!
8 : //! Message is dropped if subscriber can't consume it, not affecting other
9 : //! subscribers.
10 : //!
11 : //! Only safekeeper message is supported, but it is not hard to add something
12 : //! else with generics.
13 : use std::collections::HashMap;
14 : use std::convert::Infallible;
15 : use std::net::SocketAddr;
16 : use std::pin::Pin;
17 : use std::sync::Arc;
18 : use std::time::Duration;
19 :
20 : use camino::Utf8PathBuf;
21 : use clap::{Parser, command};
22 : use futures::future::OptionFuture;
23 : use futures_core::Stream;
24 : use futures_util::StreamExt;
25 : use http_body_util::Full;
26 : use http_utils::tls_certs::ReloadingCertificateResolver;
27 : use hyper::body::Incoming;
28 : use hyper::header::CONTENT_TYPE;
29 : use hyper::service::service_fn;
30 : use hyper::{Method, StatusCode};
31 : use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
32 : use metrics::{Encoder, TextEncoder};
33 : use parking_lot::RwLock;
34 : use storage_broker::metrics::{
35 : BROADCAST_DROPPED_MESSAGES_TOTAL, BROADCASTED_MESSAGES_TOTAL, NUM_PUBS, NUM_SUBS_ALL,
36 : NUM_SUBS_TIMELINE, PROCESSED_MESSAGES_TOTAL, PUBLISHED_ONEOFF_MESSAGES_TOTAL,
37 : };
38 : use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
39 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
40 : use storage_broker::proto::{
41 : FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
42 : SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
43 : };
44 : use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, parse_proto_ttid};
45 : use tokio::net::TcpListener;
46 : use tokio::sync::broadcast;
47 : use tokio::sync::broadcast::error::RecvError;
48 : use tokio::time;
49 : use tonic::body::{self, BoxBody, empty_body};
50 : use tonic::codegen::Service;
51 : use tonic::{Code, Request, Response, Status};
52 : use tracing::*;
53 : use utils::id::TenantTimelineId;
54 : use utils::logging::{self, LogFormat};
55 : use utils::sentry_init::init_sentry;
56 : use utils::signals::ShutdownSignals;
57 : use utils::{project_build_tag, project_git_version};
58 :
59 : project_git_version!(GIT_VERSION);
60 : project_build_tag!(BUILD_TAG);
61 :
62 : const DEFAULT_CHAN_SIZE: usize = 32;
63 : const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
64 :
65 : const DEFAULT_SSL_KEY_FILE: &str = "server.key";
66 : const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
67 : const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s";
68 :
69 : #[derive(Parser, Debug)]
70 : #[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
71 : #[clap(group(
72 : clap::ArgGroup::new("listen-addresses")
73 : .required(true)
74 : .multiple(true)
75 : .args(&["listen_addr", "listen_https_addr"]),
76 : ))]
77 : struct Args {
78 : /// Endpoint to listen HTTP on.
79 : #[arg(short, long)]
80 : listen_addr: Option<SocketAddr>,
81 : /// Endpoint to listen HTTPS on.
82 : #[arg(long)]
83 : listen_https_addr: Option<SocketAddr>,
84 : /// Size of the queue to the per timeline subscriber.
85 0 : #[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
86 0 : timeline_chan_size: usize,
87 : /// Size of the queue to the all keys subscriber.
88 0 : #[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
89 0 : all_keys_chan_size: usize,
90 : /// HTTP/2 keepalive interval.
91 : #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
92 0 : http2_keepalive_interval: Duration,
93 : /// Format for logging, either 'plain' or 'json'.
94 : #[arg(long, default_value = "plain")]
95 0 : log_format: String,
96 : /// Path to a file with certificate's private key for https API.
97 : #[arg(long, default_value = DEFAULT_SSL_KEY_FILE)]
98 0 : ssl_key_file: Utf8PathBuf,
99 : /// Path to a file with a X509 certificate for https API.
100 : #[arg(long, default_value = DEFAULT_SSL_CERT_FILE)]
101 0 : ssl_cert_file: Utf8PathBuf,
102 : /// Period to reload certificate and private key from files.
103 : #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
104 0 : ssl_cert_reload_period: Duration,
105 : }
106 :
107 : /// Id of publisher for registering in maps
108 : type PubId = u64;
109 :
110 : /// Id of subscriber for registering in maps
111 : type SubId = u64;
112 :
113 : /// Single enum type for all messages.
114 : #[derive(Clone, Debug, PartialEq)]
115 : #[allow(clippy::enum_variant_names)]
116 : enum Message {
117 : SafekeeperTimelineInfo(SafekeeperTimelineInfo),
118 : SafekeeperDiscoveryRequest(SafekeeperDiscoveryRequest),
119 : SafekeeperDiscoveryResponse(SafekeeperDiscoveryResponse),
120 : }
121 :
122 : impl Message {
123 : /// Convert proto message to internal message.
124 : #[allow(clippy::result_large_err, reason = "TODO")]
125 0 : pub fn from(proto_msg: TypedMessage) -> Result<Self, Status> {
126 0 : match proto_msg.r#type() {
127 : MessageType::SafekeeperTimelineInfo => Ok(Message::SafekeeperTimelineInfo(
128 0 : proto_msg.safekeeper_timeline_info.ok_or_else(|| {
129 0 : Status::new(Code::InvalidArgument, "missing safekeeper_timeline_info")
130 0 : })?,
131 : )),
132 : MessageType::SafekeeperDiscoveryRequest => Ok(Message::SafekeeperDiscoveryRequest(
133 0 : proto_msg.safekeeper_discovery_request.ok_or_else(|| {
134 0 : Status::new(
135 0 : Code::InvalidArgument,
136 0 : "missing safekeeper_discovery_request",
137 0 : )
138 0 : })?,
139 : )),
140 : MessageType::SafekeeperDiscoveryResponse => Ok(Message::SafekeeperDiscoveryResponse(
141 0 : proto_msg.safekeeper_discovery_response.ok_or_else(|| {
142 0 : Status::new(
143 0 : Code::InvalidArgument,
144 0 : "missing safekeeper_discovery_response",
145 0 : )
146 0 : })?,
147 : )),
148 0 : MessageType::Unknown => Err(Status::new(
149 0 : Code::InvalidArgument,
150 0 : format!("invalid message type: {:?}", proto_msg.r#type),
151 0 : )),
152 : }
153 0 : }
154 :
155 : /// Get the tenant_timeline_id from the message.
156 : #[allow(clippy::result_large_err, reason = "TODO")]
157 2 : pub fn tenant_timeline_id(&self) -> Result<Option<TenantTimelineId>, Status> {
158 2 : match self {
159 2 : Message::SafekeeperTimelineInfo(msg) => Ok(msg
160 2 : .tenant_timeline_id
161 2 : .as_ref()
162 2 : .map(parse_proto_ttid)
163 2 : .transpose()?),
164 0 : Message::SafekeeperDiscoveryRequest(msg) => Ok(msg
165 0 : .tenant_timeline_id
166 0 : .as_ref()
167 0 : .map(parse_proto_ttid)
168 0 : .transpose()?),
169 0 : Message::SafekeeperDiscoveryResponse(msg) => Ok(msg
170 0 : .tenant_timeline_id
171 0 : .as_ref()
172 0 : .map(parse_proto_ttid)
173 0 : .transpose()?),
174 : }
175 2 : }
176 :
177 : /// Convert internal message to the protobuf struct.
178 0 : pub fn as_typed_message(&self) -> TypedMessage {
179 0 : let mut res = TypedMessage {
180 0 : r#type: self.message_type() as i32,
181 0 : ..Default::default()
182 0 : };
183 0 : match self {
184 0 : Message::SafekeeperTimelineInfo(msg) => {
185 0 : res.safekeeper_timeline_info = Some(msg.clone())
186 : }
187 0 : Message::SafekeeperDiscoveryRequest(msg) => {
188 0 : res.safekeeper_discovery_request = Some(msg.clone())
189 : }
190 0 : Message::SafekeeperDiscoveryResponse(msg) => {
191 0 : res.safekeeper_discovery_response = Some(msg.clone())
192 : }
193 : }
194 0 : res
195 0 : }
196 :
197 : /// Get the message type.
198 0 : pub fn message_type(&self) -> MessageType {
199 0 : match self {
200 0 : Message::SafekeeperTimelineInfo(_) => MessageType::SafekeeperTimelineInfo,
201 0 : Message::SafekeeperDiscoveryRequest(_) => MessageType::SafekeeperDiscoveryRequest,
202 0 : Message::SafekeeperDiscoveryResponse(_) => MessageType::SafekeeperDiscoveryResponse,
203 : }
204 0 : }
205 : }
206 :
207 : #[derive(Copy, Clone, Debug)]
208 : enum SubscriptionKey {
209 : All,
210 : Timeline(TenantTimelineId),
211 : }
212 :
213 : impl SubscriptionKey {
214 : /// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
215 : #[allow(clippy::result_large_err, reason = "TODO")]
216 0 : pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
217 0 : match key {
218 0 : ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
219 0 : ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => {
220 0 : Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?))
221 : }
222 : }
223 0 : }
224 :
225 : /// Parse from FilterTenantTimelineId
226 : #[allow(clippy::result_large_err, reason = "TODO")]
227 0 : pub fn from_proto_filter_tenant_timeline_id(
228 0 : opt: Option<&FilterTenantTimelineId>,
229 0 : ) -> Result<Self, Status> {
230 0 : if opt.is_none() {
231 0 : return Ok(SubscriptionKey::All);
232 0 : }
233 0 :
234 0 : let f = opt.unwrap();
235 0 : if !f.enabled {
236 0 : return Ok(SubscriptionKey::All);
237 0 : }
238 :
239 0 : let ttid =
240 0 : parse_proto_ttid(f.tenant_timeline_id.as_ref().ok_or_else(|| {
241 0 : Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
242 0 : })?)?;
243 0 : Ok(SubscriptionKey::Timeline(ttid))
244 0 : }
245 : }
246 :
247 : /// Channel to timeline subscribers.
248 : struct ChanToTimelineSub {
249 : chan: broadcast::Sender<Message>,
250 : /// Tracked separately to know when delete the shmem entry. receiver_count()
251 : /// is unhandy for that as unregistering and dropping the receiver side
252 : /// happens at different moments.
253 : num_subscribers: u64,
254 : }
255 :
256 : struct SharedState {
257 : next_pub_id: PubId,
258 : num_pubs: i64,
259 : next_sub_id: SubId,
260 : num_subs_to_timelines: i64,
261 : chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
262 : num_subs_to_all: i64,
263 : chan_to_all_subs: broadcast::Sender<Message>,
264 : }
265 :
266 : impl SharedState {
267 1 : pub fn new(all_keys_chan_size: usize) -> Self {
268 1 : SharedState {
269 1 : next_pub_id: 0,
270 1 : num_pubs: 0,
271 1 : next_sub_id: 0,
272 1 : num_subs_to_timelines: 0,
273 1 : chans_to_timeline_subs: HashMap::new(),
274 1 : num_subs_to_all: 0,
275 1 : chan_to_all_subs: broadcast::channel(all_keys_chan_size).0,
276 1 : }
277 1 : }
278 :
279 : // Register new publisher.
280 1 : pub fn register_publisher(&mut self) -> PubId {
281 1 : let pub_id = self.next_pub_id;
282 1 : self.next_pub_id += 1;
283 1 : self.num_pubs += 1;
284 1 : NUM_PUBS.set(self.num_pubs);
285 1 : pub_id
286 1 : }
287 :
288 : // Unregister publisher.
289 1 : pub fn unregister_publisher(&mut self) {
290 1 : self.num_pubs -= 1;
291 1 : NUM_PUBS.set(self.num_pubs);
292 1 : }
293 :
294 : // Register new subscriber.
295 2 : pub fn register_subscriber(
296 2 : &mut self,
297 2 : sub_key: SubscriptionKey,
298 2 : timeline_chan_size: usize,
299 2 : ) -> (SubId, broadcast::Receiver<Message>) {
300 2 : let sub_id = self.next_sub_id;
301 2 : self.next_sub_id += 1;
302 2 : let sub_rx = match sub_key {
303 : SubscriptionKey::All => {
304 1 : self.num_subs_to_all += 1;
305 1 : NUM_SUBS_ALL.set(self.num_subs_to_all);
306 1 : self.chan_to_all_subs.subscribe()
307 : }
308 1 : SubscriptionKey::Timeline(ttid) => {
309 1 : self.num_subs_to_timelines += 1;
310 1 : NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
311 1 : // Create new broadcast channel for this key, or subscriber to
312 1 : // the existing one.
313 1 : let chan_to_timeline_sub =
314 1 : self.chans_to_timeline_subs
315 1 : .entry(ttid)
316 1 : .or_insert(ChanToTimelineSub {
317 1 : chan: broadcast::channel(timeline_chan_size).0,
318 1 : num_subscribers: 0,
319 1 : });
320 1 : chan_to_timeline_sub.num_subscribers += 1;
321 1 : chan_to_timeline_sub.chan.subscribe()
322 : }
323 : };
324 2 : (sub_id, sub_rx)
325 2 : }
326 :
327 : // Unregister the subscriber.
328 2 : pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) {
329 2 : match sub_key {
330 1 : SubscriptionKey::All => {
331 1 : self.num_subs_to_all -= 1;
332 1 : NUM_SUBS_ALL.set(self.num_subs_to_all);
333 1 : }
334 1 : SubscriptionKey::Timeline(ttid) => {
335 1 : self.num_subs_to_timelines -= 1;
336 1 : NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
337 1 :
338 1 : // Remove from the map, destroying the channel, if we are the
339 1 : // last subscriber to this timeline.
340 1 :
341 1 : // Missing entry is a bug; we must have registered.
342 1 : let chan_to_timeline_sub = self
343 1 : .chans_to_timeline_subs
344 1 : .get_mut(&ttid)
345 1 : .expect("failed to find sub entry in shmem during unregister");
346 1 : chan_to_timeline_sub.num_subscribers -= 1;
347 1 : if chan_to_timeline_sub.num_subscribers == 0 {
348 1 : self.chans_to_timeline_subs.remove(&ttid);
349 1 : }
350 : }
351 : }
352 2 : }
353 : }
354 :
355 : // SharedState wrapper.
356 : #[derive(Clone)]
357 : struct Registry {
358 : shared_state: Arc<RwLock<SharedState>>,
359 : timeline_chan_size: usize,
360 : }
361 :
362 : impl Registry {
363 : // Register new publisher in shared state.
364 1 : pub fn register_publisher(&self, remote_addr: SocketAddr) -> Publisher {
365 1 : let pub_id = self.shared_state.write().register_publisher();
366 1 : info!("publication started id={} addr={:?}", pub_id, remote_addr);
367 1 : Publisher {
368 1 : id: pub_id,
369 1 : registry: self.clone(),
370 1 : remote_addr,
371 1 : }
372 1 : }
373 :
374 1 : pub fn unregister_publisher(&self, publisher: &Publisher) {
375 1 : self.shared_state.write().unregister_publisher();
376 1 : info!(
377 0 : "publication ended id={} addr={:?}",
378 : publisher.id, publisher.remote_addr
379 : );
380 1 : }
381 :
382 : // Register new subscriber in shared state.
383 2 : pub fn register_subscriber(
384 2 : &self,
385 2 : sub_key: SubscriptionKey,
386 2 : remote_addr: SocketAddr,
387 2 : ) -> Subscriber {
388 2 : let (sub_id, sub_rx) = self
389 2 : .shared_state
390 2 : .write()
391 2 : .register_subscriber(sub_key, self.timeline_chan_size);
392 2 : info!(
393 0 : "subscription started id={}, key={:?}, addr={:?}",
394 : sub_id, sub_key, remote_addr
395 : );
396 2 : Subscriber {
397 2 : id: sub_id,
398 2 : key: sub_key,
399 2 : sub_rx,
400 2 : registry: self.clone(),
401 2 : remote_addr,
402 2 : }
403 2 : }
404 :
405 : // Unregister the subscriber
406 2 : pub fn unregister_subscriber(&self, subscriber: &Subscriber) {
407 2 : self.shared_state
408 2 : .write()
409 2 : .unregister_subscriber(subscriber.key);
410 2 : info!(
411 0 : "subscription ended id={}, key={:?}, addr={:?}",
412 : subscriber.id, subscriber.key, subscriber.remote_addr
413 : );
414 2 : }
415 :
416 : /// Send msg to relevant subscribers.
417 : #[allow(clippy::result_large_err, reason = "TODO")]
418 2 : pub fn send_msg(&self, msg: &Message) -> Result<(), Status> {
419 2 : PROCESSED_MESSAGES_TOTAL.inc();
420 2 :
421 2 : // send message to subscribers for everything
422 2 : let shared_state = self.shared_state.read();
423 2 : // Err means there is no subscribers, it is fine.
424 2 : shared_state.chan_to_all_subs.send(msg.clone()).ok();
425 :
426 : // send message to per timeline subscribers, if there is ttid
427 2 : let ttid = msg.tenant_timeline_id()?;
428 2 : if let Some(ttid) = ttid {
429 2 : if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
430 1 : // Err can't happen here, as tx is destroyed only after removing
431 1 : // from the map the last subscriber along with tx.
432 1 : subs.chan
433 1 : .send(msg.clone())
434 1 : .expect("rx is still in the map with zero subscribers");
435 1 : }
436 0 : }
437 2 : Ok(())
438 2 : }
439 : }
440 :
441 : // Private subscriber state.
442 : struct Subscriber {
443 : id: SubId,
444 : key: SubscriptionKey,
445 : // Subscriber receives messages from publishers here.
446 : sub_rx: broadcast::Receiver<Message>,
447 : // to unregister itself from shared state in Drop
448 : registry: Registry,
449 : // for logging
450 : remote_addr: SocketAddr,
451 : }
452 :
453 : impl Drop for Subscriber {
454 2 : fn drop(&mut self) {
455 2 : self.registry.unregister_subscriber(self);
456 2 : }
457 : }
458 :
459 : // Private publisher state
460 : struct Publisher {
461 : id: PubId,
462 : registry: Registry,
463 : // for logging
464 : remote_addr: SocketAddr,
465 : }
466 :
467 : impl Publisher {
468 : /// Send msg to relevant subscribers.
469 : #[allow(clippy::result_large_err, reason = "TODO")]
470 2 : pub fn send_msg(&mut self, msg: &Message) -> Result<(), Status> {
471 2 : self.registry.send_msg(msg)
472 2 : }
473 : }
474 :
475 : impl Drop for Publisher {
476 1 : fn drop(&mut self) {
477 1 : self.registry.unregister_publisher(self);
478 1 : }
479 : }
480 :
481 : struct Broker {
482 : registry: Registry,
483 : }
484 :
485 : #[tonic::async_trait]
486 : impl BrokerService for Broker {
487 0 : async fn publish_safekeeper_info(
488 0 : &self,
489 0 : request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
490 0 : ) -> Result<Response<()>, Status> {
491 0 : let &RemoteAddr(remote_addr) = request
492 0 : .extensions()
493 0 : .get()
494 0 : .expect("RemoteAddr inserted by handler");
495 0 : let mut publisher = self.registry.register_publisher(remote_addr);
496 0 :
497 0 : let mut stream = request.into_inner();
498 :
499 : loop {
500 0 : match stream.next().await {
501 0 : Some(Ok(msg)) => publisher.send_msg(&Message::SafekeeperTimelineInfo(msg))?,
502 0 : Some(Err(e)) => return Err(e), // grpc error from the stream
503 0 : None => break, // closed stream
504 0 : }
505 0 : }
506 0 :
507 0 : Ok(Response::new(()))
508 0 : }
509 :
510 : type SubscribeSafekeeperInfoStream =
511 : Pin<Box<dyn Stream<Item = Result<SafekeeperTimelineInfo, Status>> + Send + 'static>>;
512 :
513 0 : async fn subscribe_safekeeper_info(
514 0 : &self,
515 0 : request: Request<SubscribeSafekeeperInfoRequest>,
516 0 : ) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
517 0 : let &RemoteAddr(remote_addr) = request
518 0 : .extensions()
519 0 : .get()
520 0 : .expect("RemoteAddr inserted by handler");
521 0 : let proto_key = request
522 0 : .into_inner()
523 0 : .subscription_key
524 0 : .ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?;
525 0 : let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
526 0 : let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
527 0 :
528 0 : // transform rx into stream with item = Result, as method result demands
529 0 : let output = async_stream::try_stream! {
530 0 : let mut warn_interval = time::interval(Duration::from_millis(1000));
531 0 : let mut missed_msgs: u64 = 0;
532 0 : loop {
533 0 : match subscriber.sub_rx.recv().await {
534 0 : Ok(info) => {
535 0 : match info {
536 0 : Message::SafekeeperTimelineInfo(info) => yield info,
537 0 : _ => {},
538 0 : }
539 0 : BROADCASTED_MESSAGES_TOTAL.inc();
540 0 : },
541 0 : Err(RecvError::Lagged(skipped_msg)) => {
542 0 : BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
543 0 : missed_msgs += skipped_msg;
544 0 : if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
545 0 : warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
546 0 : subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
547 0 : missed_msgs = 0;
548 0 : }
549 0 : }
550 0 : Err(RecvError::Closed) => {
551 0 : // can't happen, we never drop the channel while there is a subscriber
552 0 : Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
553 0 : }
554 0 : }
555 0 : }
556 0 : };
557 0 :
558 0 : Ok(Response::new(
559 0 : Box::pin(output) as Self::SubscribeSafekeeperInfoStream
560 0 : ))
561 0 : }
562 :
563 : type SubscribeByFilterStream =
564 : Pin<Box<dyn Stream<Item = Result<TypedMessage, Status>> + Send + 'static>>;
565 :
566 : /// Subscribe to all messages, limited by a filter.
567 0 : async fn subscribe_by_filter(
568 0 : &self,
569 0 : request: Request<SubscribeByFilterRequest>,
570 0 : ) -> std::result::Result<Response<Self::SubscribeByFilterStream>, Status> {
571 0 : let &RemoteAddr(remote_addr) = request
572 0 : .extensions()
573 0 : .get()
574 0 : .expect("RemoteAddr inserted by handler");
575 0 : let proto_filter = request.into_inner();
576 0 : let ttid_filter = proto_filter.tenant_timeline_id.as_ref();
577 :
578 0 : let sub_key = SubscriptionKey::from_proto_filter_tenant_timeline_id(ttid_filter)?;
579 0 : let types_set = proto_filter
580 0 : .types
581 0 : .iter()
582 0 : .map(|t| t.r#type)
583 0 : .collect::<std::collections::HashSet<_>>();
584 0 :
585 0 : let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
586 0 :
587 0 : // transform rx into stream with item = Result, as method result demands
588 0 : let output = async_stream::try_stream! {
589 0 : let mut warn_interval = time::interval(Duration::from_millis(1000));
590 0 : let mut missed_msgs: u64 = 0;
591 0 : loop {
592 0 : match subscriber.sub_rx.recv().await {
593 0 : Ok(msg) => {
594 0 : let msg_type = msg.message_type() as i32;
595 0 : if types_set.contains(&msg_type) {
596 0 : yield msg.as_typed_message();
597 0 : BROADCASTED_MESSAGES_TOTAL.inc();
598 0 : }
599 0 : },
600 0 : Err(RecvError::Lagged(skipped_msg)) => {
601 0 : BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
602 0 : missed_msgs += skipped_msg;
603 0 : if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
604 0 : warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
605 0 : subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
606 0 : missed_msgs = 0;
607 0 : }
608 0 : }
609 0 : Err(RecvError::Closed) => {
610 0 : // can't happen, we never drop the channel while there is a subscriber
611 0 : Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
612 0 : }
613 0 : }
614 0 : }
615 0 : };
616 0 :
617 0 : Ok(Response::new(
618 0 : Box::pin(output) as Self::SubscribeByFilterStream
619 0 : ))
620 0 : }
621 :
622 : /// Publish one message.
623 0 : async fn publish_one(
624 0 : &self,
625 0 : request: Request<TypedMessage>,
626 0 : ) -> std::result::Result<Response<()>, Status> {
627 0 : let msg = Message::from(request.into_inner())?;
628 0 : PUBLISHED_ONEOFF_MESSAGES_TOTAL.inc();
629 0 : self.registry.send_msg(&msg)?;
630 0 : Ok(Response::new(()))
631 0 : }
632 : }
633 :
634 : // We serve only metrics and healthcheck through http1.
635 0 : async fn http1_handler(
636 0 : req: hyper::Request<Incoming>,
637 0 : ) -> Result<hyper::Response<BoxBody>, Infallible> {
638 0 : let resp = match (req.method(), req.uri().path()) {
639 0 : (&Method::GET, "/metrics") => {
640 0 : let mut buffer = vec![];
641 0 : let metrics = metrics::gather();
642 0 : let encoder = TextEncoder::new();
643 0 : encoder.encode(&metrics, &mut buffer).unwrap();
644 0 :
645 0 : hyper::Response::builder()
646 0 : .status(StatusCode::OK)
647 0 : .header(CONTENT_TYPE, encoder.format_type())
648 0 : .body(body::boxed(Full::new(bytes::Bytes::from(buffer))))
649 0 : .unwrap()
650 : }
651 0 : (&Method::GET, "/status") => hyper::Response::builder()
652 0 : .status(StatusCode::OK)
653 0 : .body(empty_body())
654 0 : .unwrap(),
655 0 : _ => hyper::Response::builder()
656 0 : .status(StatusCode::NOT_FOUND)
657 0 : .body(empty_body())
658 0 : .unwrap(),
659 : };
660 0 : Ok(resp)
661 0 : }
662 :
663 : #[derive(Clone, Copy)]
664 : struct RemoteAddr(SocketAddr);
665 :
666 : #[tokio::main]
667 0 : async fn main() -> Result<(), Box<dyn std::error::Error>> {
668 0 : let args = Args::parse();
669 0 :
670 0 : // important to keep the order of:
671 0 : // 1. init logging
672 0 : // 2. tracing panic hook
673 0 : // 3. sentry
674 0 : logging::init(
675 0 : LogFormat::from_config(&args.log_format)?,
676 0 : logging::TracingErrorLayerEnablement::Disabled,
677 0 : logging::Output::Stdout,
678 0 : )?;
679 0 : logging::replace_panic_hook_with_tracing_panic_hook().forget();
680 0 : // initialize sentry if SENTRY_DSN is provided
681 0 : let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
682 0 : info!("version: {GIT_VERSION} build_tag: {BUILD_TAG}");
683 0 : metrics::set_build_info_metric(GIT_VERSION, BUILD_TAG);
684 0 :
685 0 : // On any shutdown signal, log receival and exit.
686 0 : std::thread::spawn(move || {
687 0 : ShutdownSignals::handle(|signal| {
688 0 : info!("received {}, terminating", signal.name());
689 0 : std::process::exit(0);
690 0 : })
691 0 : });
692 0 :
693 0 : let registry = Registry {
694 0 : shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))),
695 0 : timeline_chan_size: args.timeline_chan_size,
696 0 : };
697 0 : let storage_broker_impl = Broker {
698 0 : registry: registry.clone(),
699 0 : };
700 0 : let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
701 0 :
702 0 : let http_listener = match &args.listen_addr {
703 0 : Some(addr) => {
704 0 : info!("listening HTTP on {}", addr);
705 0 : Some(TcpListener::bind(addr).await?)
706 0 : }
707 0 : None => None,
708 0 : };
709 0 :
710 0 : let (https_listener, tls_acceptor) = match &args.listen_https_addr {
711 0 : Some(addr) => {
712 0 : let listener = TcpListener::bind(addr).await?;
713 0 :
714 0 : let cert_resolver = ReloadingCertificateResolver::new(
715 0 : "main",
716 0 : &args.ssl_key_file,
717 0 : &args.ssl_cert_file,
718 0 : args.ssl_cert_reload_period,
719 0 : )
720 0 : .await?;
721 0 :
722 0 : let mut tls_config = rustls::ServerConfig::builder()
723 0 : .with_no_client_auth()
724 0 : .with_cert_resolver(cert_resolver);
725 0 :
726 0 : // Tonic is HTTP/2 only and it negotiates it with ALPN.
727 0 : tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
728 0 :
729 0 : let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
730 0 :
731 0 : info!("listening HTTPS on {}", addr);
732 0 : (Some(listener), Some(acceptor))
733 0 : }
734 0 : None => (None, None),
735 0 : };
736 0 :
737 0 : // grpc is served along with http1 for metrics on a single port, hence we
738 0 : // don't use tonic's Server.
739 0 : loop {
740 0 : let (conn, is_https) = tokio::select! {
741 0 : Some(conn) = OptionFuture::from(http_listener.as_ref().map(|l| l.accept())) => (conn, false),
742 0 : Some(conn) = OptionFuture::from(https_listener.as_ref().map(|l| l.accept())) => (conn, true),
743 0 : };
744 0 :
745 0 : let (tcp_stream, addr) = match conn {
746 0 : Ok(v) => v,
747 0 : Err(e) => {
748 0 : info!("couldn't accept connection: {e}");
749 0 : continue;
750 0 : }
751 0 : };
752 0 :
753 0 : let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
754 0 : builder.http1().timer(TokioTimer::new());
755 0 : builder
756 0 : .http2()
757 0 : .timer(TokioTimer::new())
758 0 : .keep_alive_interval(Some(args.http2_keepalive_interval))
759 0 : // This matches the tonic server default. It allows us to support production-like workloads.
760 0 : .max_concurrent_streams(None);
761 0 :
762 0 : let storage_broker_server_cloned = storage_broker_server.clone();
763 0 : let remote_addr = RemoteAddr(addr);
764 0 : let service_fn_ = async move {
765 0 : service_fn(move |mut req| {
766 0 : // That's what tonic's MakeSvc.call does to pass conninfo to
767 0 : // the request handler (and where its request.remote_addr()
768 0 : // expects it to find).
769 0 : req.extensions_mut().insert(remote_addr);
770 0 :
771 0 : // Technically this second clone is not needed, but consume
772 0 : // by async block is apparently unavoidable. BTW, error
773 0 : // message is enigmatic, see
774 0 : // https://github.com/rust-lang/rust/issues/68119
775 0 : //
776 0 : // We could get away without async block at all, but then we
777 0 : // need to resort to futures::Either to merge the result,
778 0 : // which doesn't caress an eye as well.
779 0 : let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
780 0 : async move {
781 0 : if req.headers().get("content-type").map(|x| x.as_bytes())
782 0 : == Some(b"application/grpc")
783 0 : {
784 0 : let res_resp = storage_broker_server_svc.call(req).await;
785 0 : // Grpc and http1 handlers have slightly different
786 0 : // Response types: it is UnsyncBoxBody for the
787 0 : // former one (not sure why) and plain hyper::Body
788 0 : // for the latter. Both implement HttpBody though,
789 0 : // and `Either` is used to merge them.
790 0 : res_resp.map(|resp| resp.map(http_body_util::Either::Left))
791 0 : } else {
792 0 : let res_resp = http1_handler(req).await;
793 0 : res_resp.map(|resp| resp.map(http_body_util::Either::Right))
794 0 : }
795 0 : }
796 0 : })
797 0 : }
798 0 : .await;
799 0 :
800 0 : let tls_acceptor = tls_acceptor.clone();
801 0 :
802 0 : tokio::task::spawn(async move {
803 0 : let res = if is_https {
804 0 : let tls_acceptor =
805 0 : tls_acceptor.expect("tls_acceptor is set together with https_listener");
806 0 :
807 0 : let tls_stream = match tls_acceptor.accept(tcp_stream).await {
808 0 : Ok(tls_stream) => tls_stream,
809 0 : Err(e) => {
810 0 : info!("error accepting TLS connection from {addr}: {e}");
811 0 : return;
812 0 : }
813 0 : };
814 0 :
815 0 : builder
816 0 : .serve_connection(TokioIo::new(tls_stream), service_fn_)
817 0 : .await
818 0 : } else {
819 0 : builder
820 0 : .serve_connection(TokioIo::new(tcp_stream), service_fn_)
821 0 : .await
822 0 : };
823 0 :
824 0 : if let Err(e) = res {
825 0 : info!(%is_https, "error serving connection from {addr}: {e}");
826 0 : }
827 0 : });
828 0 : }
829 0 : }
830 :
831 : #[cfg(test)]
832 : mod tests {
833 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
834 : use tokio::sync::broadcast::error::TryRecvError;
835 : use utils::id::{TenantId, TimelineId};
836 :
837 : use super::*;
838 :
839 2 : fn msg(timeline_id: Vec<u8>) -> Message {
840 2 : Message::SafekeeperTimelineInfo(SafekeeperTimelineInfo {
841 2 : safekeeper_id: 1,
842 2 : tenant_timeline_id: Some(ProtoTenantTimelineId {
843 2 : tenant_id: vec![0x00; 16],
844 2 : timeline_id,
845 2 : }),
846 2 : term: 0,
847 2 : last_log_term: 0,
848 2 : flush_lsn: 1,
849 2 : commit_lsn: 2,
850 2 : backup_lsn: 3,
851 2 : remote_consistent_lsn: 4,
852 2 : peer_horizon_lsn: 5,
853 2 : safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
854 2 : http_connstr: "neon-1-sk-1.local:7677".to_owned(),
855 2 : https_connstr: Some("neon-1-sk-1.local:7678".to_owned()),
856 2 : local_start_lsn: 0,
857 2 : availability_zone: None,
858 2 : standby_horizon: 0,
859 2 : })
860 2 : }
861 :
862 3 : fn tli_from_u64(i: u64) -> Vec<u8> {
863 3 : let mut timeline_id = vec![0xFF; 8];
864 3 : timeline_id.extend_from_slice(&i.to_be_bytes());
865 3 : timeline_id
866 3 : }
867 :
868 3 : fn mock_addr() -> SocketAddr {
869 3 : "127.0.0.1:8080".parse().unwrap()
870 3 : }
871 :
872 : #[tokio::test]
873 1 : async fn test_registry() {
874 1 : let registry = Registry {
875 1 : shared_state: Arc::new(RwLock::new(SharedState::new(16))),
876 1 : timeline_chan_size: 16,
877 1 : };
878 1 :
879 1 : // subscribe to timeline 2
880 1 : let ttid_2 = TenantTimelineId {
881 1 : tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
882 1 : timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(),
883 1 : };
884 1 : let sub_key_2 = SubscriptionKey::Timeline(ttid_2);
885 1 : let mut subscriber_2 = registry.register_subscriber(sub_key_2, mock_addr());
886 1 : let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All, mock_addr());
887 1 :
888 1 : // send two messages with different keys
889 1 : let msg_1 = msg(tli_from_u64(1));
890 1 : let msg_2 = msg(tli_from_u64(2));
891 1 : let mut publisher = registry.register_publisher(mock_addr());
892 1 : publisher.send_msg(&msg_1).expect("failed to send msg");
893 1 : publisher.send_msg(&msg_2).expect("failed to send msg");
894 1 :
895 1 : // msg with key 2 should arrive to subscriber_2
896 1 : assert_eq!(subscriber_2.sub_rx.try_recv().unwrap(), msg_2);
897 1 :
898 1 : // but nothing more
899 1 : assert_eq!(
900 1 : subscriber_2.sub_rx.try_recv().unwrap_err(),
901 1 : TryRecvError::Empty
902 1 : );
903 1 :
904 1 : // subscriber_all should receive both messages
905 1 : assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_1);
906 1 : assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_2);
907 1 : assert_eq!(
908 1 : subscriber_all.sub_rx.try_recv().unwrap_err(),
909 1 : TryRecvError::Empty
910 1 : );
911 1 : }
912 : }
|