LCOV - code coverage report
Current view: top level - safekeeper/src - broker.rs (source / functions) Coverage Total Hit
Test: 2620485e474b48c32427149a5d91ef8fc2cd649e.info Lines: 0.0 % 200 0
Test Date: 2025-05-01 22:50:11 Functions: 0.0 % 17 0

            Line data    Source code
       1              : //! Communication with the broker, providing safekeeper peers and pageserver coordination.
       2              : 
       3              : use std::sync::Arc;
       4              : use std::sync::atomic::AtomicU64;
       5              : use std::time::{Duration, Instant, UNIX_EPOCH};
       6              : 
       7              : use anyhow::{Context, Error, Result, anyhow, bail};
       8              : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
       9              : use storage_broker::proto::{
      10              :     FilterTenantTimelineId, MessageType, SafekeeperDiscoveryResponse, SubscribeByFilterRequest,
      11              :     SubscribeSafekeeperInfoRequest, TypeSubscription, TypedMessage,
      12              : };
      13              : use storage_broker::{Request, parse_proto_ttid};
      14              : use tokio::task::JoinHandle;
      15              : use tokio::time::sleep;
      16              : use tracing::*;
      17              : 
      18              : use crate::metrics::{
      19              :     BROKER_ITERATION_TIMELINES, BROKER_PULLED_UPDATES, BROKER_PUSH_ALL_UPDATES_SECONDS,
      20              :     BROKER_PUSHED_UPDATES,
      21              : };
      22              : use crate::{GlobalTimelines, SafeKeeperConf};
      23              : 
      24              : const RETRY_INTERVAL_MSEC: u64 = 1000;
      25              : const PUSH_INTERVAL_MSEC: u64 = 1000;
      26              : 
      27            0 : fn make_tls_config(conf: &SafeKeeperConf) -> storage_broker::ClientTlsConfig {
      28            0 :     storage_broker::ClientTlsConfig::new().ca_certificates(
      29            0 :         conf.ssl_ca_certs
      30            0 :             .iter()
      31            0 :             .map(pem::encode)
      32            0 :             .map(storage_broker::Certificate::from_pem),
      33            0 :     )
      34            0 : }
      35              : 
      36              : /// Push once in a while data about all active timelines to the broker.
      37            0 : async fn push_loop(
      38            0 :     conf: Arc<SafeKeeperConf>,
      39            0 :     global_timelines: Arc<GlobalTimelines>,
      40            0 : ) -> anyhow::Result<()> {
      41            0 :     if conf.disable_periodic_broker_push {
      42            0 :         info!("broker push_loop is disabled, doing nothing...");
      43            0 :         futures::future::pending::<()>().await; // sleep forever
      44            0 :         return Ok(());
      45            0 :     }
      46            0 : 
      47            0 :     let active_timelines_set = global_timelines.get_global_broker_active_set();
      48              : 
      49            0 :     let mut client = storage_broker::connect(
      50            0 :         conf.broker_endpoint.clone(),
      51            0 :         conf.broker_keepalive_interval,
      52            0 :         make_tls_config(&conf),
      53            0 :     )?;
      54            0 :     let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
      55            0 : 
      56            0 :     let outbound = async_stream::stream! {
      57            0 :         loop {
      58            0 :             // Note: we lock runtime here and in timeline methods as GlobalTimelines
      59            0 :             // is under plain mutex. That's ok, all this code is not performance
      60            0 :             // sensitive and there is no risk of deadlock as we don't await while
      61            0 :             // lock is held.
      62            0 :             let now = Instant::now();
      63            0 :             let all_tlis = active_timelines_set.get_all();
      64            0 :             let mut n_pushed_tlis = 0;
      65            0 :             for tli in &all_tlis {
      66            0 :                 let sk_info = tli.get_safekeeper_info(&conf).await;
      67            0 :                 yield sk_info;
      68            0 :                 BROKER_PUSHED_UPDATES.inc();
      69            0 :                 n_pushed_tlis += 1;
      70            0 :             }
      71            0 :             let elapsed = now.elapsed();
      72            0 : 
      73            0 :             BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
      74            0 :             BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64);
      75            0 : 
      76            0 :             if elapsed > push_interval / 2 {
      77            0 :                 info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed);
      78            0 :             }
      79            0 : 
      80            0 :             sleep(push_interval).await;
      81            0 :         }
      82            0 :     };
      83            0 :     client
      84            0 :         .publish_safekeeper_info(Request::new(outbound))
      85            0 :         .await?;
      86            0 :     Ok(())
      87            0 : }
      88              : 
      89              : /// Subscribe and fetch all the interesting data from the broker.
      90              : #[instrument(name = "broker_pull", skip_all)]
      91              : async fn pull_loop(
      92              :     conf: Arc<SafeKeeperConf>,
      93              :     global_timelines: Arc<GlobalTimelines>,
      94              :     stats: Arc<BrokerStats>,
      95              : ) -> Result<()> {
      96              :     let mut client = storage_broker::connect(
      97              :         conf.broker_endpoint.clone(),
      98              :         conf.broker_keepalive_interval,
      99              :         make_tls_config(&conf),
     100              :     )?;
     101              : 
     102              :     // TODO: subscribe only to local timelines instead of all
     103              :     let request = SubscribeSafekeeperInfoRequest {
     104              :         subscription_key: Some(ProtoSubscriptionKey::All(())),
     105              :     };
     106              : 
     107              :     let mut stream = client
     108              :         .subscribe_safekeeper_info(request)
     109              :         .await
     110              :         .context("subscribe_safekeper_info request failed")?
     111              :         .into_inner();
     112              : 
     113              :     let ok_counter = BROKER_PULLED_UPDATES.with_label_values(&["ok"]);
     114              :     let not_found = BROKER_PULLED_UPDATES.with_label_values(&["not_found"]);
     115              :     let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]);
     116              : 
     117              :     while let Some(msg) = stream.message().await? {
     118              :         stats.update_pulled();
     119              : 
     120              :         let proto_ttid = msg
     121              :             .tenant_timeline_id
     122              :             .as_ref()
     123            0 :             .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
     124              :         let ttid = parse_proto_ttid(proto_ttid)?;
     125              :         if let Ok(tli) = global_timelines.get(ttid) {
     126              :             // Note that we also receive *our own* info. That's
     127              :             // important, as it is used as an indication of live
     128              :             // connection to the broker.
     129              : 
     130              :             // note: there are blocking operations below, but it's considered fine for now
     131              :             let res = tli.record_safekeeper_info(msg).await;
     132              :             if res.is_ok() {
     133              :                 ok_counter.inc();
     134              :             } else {
     135              :                 err_counter.inc();
     136              :             }
     137              :             res?;
     138              :         } else {
     139              :             not_found.inc();
     140              :         }
     141              :     }
     142              :     bail!("end of stream");
     143              : }
     144              : 
     145              : /// Process incoming discover requests. This is done in a separate task to avoid
     146              : /// interfering with the normal pull/push loops.
     147            0 : async fn discover_loop(
     148            0 :     conf: Arc<SafeKeeperConf>,
     149            0 :     global_timelines: Arc<GlobalTimelines>,
     150            0 :     stats: Arc<BrokerStats>,
     151            0 : ) -> Result<()> {
     152            0 :     let mut client = storage_broker::connect(
     153            0 :         conf.broker_endpoint.clone(),
     154            0 :         conf.broker_keepalive_interval,
     155            0 :         make_tls_config(&conf),
     156            0 :     )?;
     157              : 
     158            0 :     let request = SubscribeByFilterRequest {
     159            0 :         types: vec![TypeSubscription {
     160            0 :             r#type: MessageType::SafekeeperDiscoveryRequest as i32,
     161            0 :         }],
     162            0 :         tenant_timeline_id: Some(FilterTenantTimelineId {
     163            0 :             enabled: false,
     164            0 :             tenant_timeline_id: None,
     165            0 :         }),
     166            0 :     };
     167              : 
     168            0 :     let mut stream = client
     169            0 :         .subscribe_by_filter(request)
     170            0 :         .await
     171            0 :         .context("subscribe_by_filter request failed")?
     172            0 :         .into_inner();
     173            0 : 
     174            0 :     let discover_counter = BROKER_PULLED_UPDATES.with_label_values(&["discover"]);
     175              : 
     176            0 :     while let Some(typed_msg) = stream.message().await? {
     177            0 :         stats.update_pulled();
     178            0 : 
     179            0 :         match typed_msg.r#type() {
     180              :             MessageType::SafekeeperDiscoveryRequest => {
     181            0 :                 let msg = typed_msg
     182            0 :                     .safekeeper_discovery_request
     183            0 :                     .expect("proto type mismatch from broker message");
     184              : 
     185            0 :                 let proto_ttid = msg
     186            0 :                     .tenant_timeline_id
     187            0 :                     .as_ref()
     188            0 :                     .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
     189            0 :                 let ttid = parse_proto_ttid(proto_ttid)?;
     190            0 :                 if let Ok(tli) = global_timelines.get(ttid) {
     191              :                     // we received a discovery request for a timeline we know about
     192            0 :                     discover_counter.inc();
     193              : 
     194              :                     // create and reply with discovery response
     195            0 :                     let sk_info = tli.get_safekeeper_info(&conf).await;
     196            0 :                     let response = SafekeeperDiscoveryResponse {
     197            0 :                         safekeeper_id: sk_info.safekeeper_id,
     198            0 :                         tenant_timeline_id: sk_info.tenant_timeline_id,
     199            0 :                         commit_lsn: sk_info.commit_lsn,
     200            0 :                         safekeeper_connstr: sk_info.safekeeper_connstr,
     201            0 :                         availability_zone: sk_info.availability_zone,
     202            0 :                         standby_horizon: 0,
     203            0 :                     };
     204            0 : 
     205            0 :                     // note this is a blocking call
     206            0 :                     client
     207            0 :                         .publish_one(TypedMessage {
     208            0 :                             r#type: MessageType::SafekeeperDiscoveryResponse as i32,
     209            0 :                             safekeeper_timeline_info: None,
     210            0 :                             safekeeper_discovery_request: None,
     211            0 :                             safekeeper_discovery_response: Some(response),
     212            0 :                         })
     213            0 :                         .await?;
     214            0 :                 }
     215              :             }
     216              : 
     217              :             _ => {
     218            0 :                 warn!(
     219            0 :                     "unexpected message type i32 {}, {:?}",
     220            0 :                     typed_msg.r#type,
     221            0 :                     typed_msg.r#type()
     222              :                 );
     223              :             }
     224              :         }
     225              :     }
     226            0 :     bail!("end of stream");
     227            0 : }
     228              : 
     229            0 : pub async fn task_main(
     230            0 :     conf: Arc<SafeKeeperConf>,
     231            0 :     global_timelines: Arc<GlobalTimelines>,
     232            0 : ) -> anyhow::Result<()> {
     233            0 :     info!("started, broker endpoint {:?}", conf.broker_endpoint);
     234              : 
     235            0 :     let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
     236            0 :     let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
     237            0 :     let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
     238            0 :     let mut discover_handle: Option<JoinHandle<Result<(), Error>>> = None;
     239            0 : 
     240            0 :     let stats = Arc::new(BrokerStats::new());
     241            0 :     let stats_task = task_stats(stats.clone());
     242            0 :     tokio::pin!(stats_task);
     243              : 
     244              :     // Selecting on JoinHandles requires some squats; is there a better way to
     245              :     // reap tasks individually?
     246              : 
     247              :     // Handling failures in task itself won't catch panic and in Tokio, task's
     248              :     // panic doesn't kill the whole executor, so it is better to do reaping
     249              :     // here.
     250              :     loop {
     251            0 :         tokio::select! {
     252            0 :                 res = async { push_handle.as_mut().unwrap().await }, if push_handle.is_some() => {
     253              :                     // was it panic or normal error?
     254            0 :                     let err = match res {
     255            0 :                         Ok(res_internal) => res_internal.unwrap_err(),
     256            0 :                         Err(err_outer) => err_outer.into(),
     257              :                     };
     258            0 :                     warn!("push task failed: {:?}", err);
     259            0 :                     push_handle = None;
     260              :                 },
     261            0 :                 res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
     262              :                     // was it panic or normal error?
     263            0 :                     match res {
     264            0 :                         Ok(res_internal) => if let Err(err_inner) = res_internal {
     265            0 :                             warn!("pull task failed: {:?}", err_inner);
     266            0 :                         }
     267            0 :                         Err(err_outer) => { warn!("pull task panicked: {:?}", err_outer) }
     268              :                     };
     269            0 :                     pull_handle = None;
     270              :                 },
     271            0 :                 res = async { discover_handle.as_mut().unwrap().await }, if discover_handle.is_some() => {
     272              :                     // was it panic or normal error?
     273            0 :                     match res {
     274            0 :                         Ok(res_internal) => if let Err(err_inner) = res_internal {
     275            0 :                             warn!("discover task failed: {:?}", err_inner);
     276            0 :                         }
     277            0 :                         Err(err_outer) => { warn!("discover task panicked: {:?}", err_outer) }
     278              :                     };
     279            0 :                     discover_handle = None;
     280              :                 },
     281            0 :                 _ = ticker.tick() => {
     282            0 :                     if push_handle.is_none() {
     283            0 :                         push_handle = Some(tokio::spawn(push_loop(conf.clone(), global_timelines.clone())));
     284            0 :                     }
     285            0 :                     if pull_handle.is_none() {
     286            0 :                         pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), global_timelines.clone(), stats.clone())));
     287            0 :                     }
     288            0 :                     if discover_handle.is_none() {
     289            0 :                         discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), global_timelines.clone(), stats.clone())));
     290            0 :                     }
     291              :                 },
     292            0 :                 _ = &mut stats_task => {}
     293              :         }
     294              :     }
     295              : }
     296              : 
     297              : struct BrokerStats {
     298              :     /// Timestamp of the last received message from the broker.
     299              :     last_pulled_ts: AtomicU64,
     300              : }
     301              : 
     302              : impl BrokerStats {
     303            0 :     fn new() -> Self {
     304            0 :         BrokerStats {
     305            0 :             last_pulled_ts: AtomicU64::new(0),
     306            0 :         }
     307            0 :     }
     308              : 
     309            0 :     fn now_millis() -> u64 {
     310            0 :         std::time::SystemTime::now()
     311            0 :             .duration_since(UNIX_EPOCH)
     312            0 :             .expect("time is before epoch")
     313            0 :             .as_millis() as u64
     314            0 :     }
     315              : 
     316              :     /// Update last_pulled timestamp to current time.
     317            0 :     fn update_pulled(&self) {
     318            0 :         self.last_pulled_ts
     319            0 :             .store(Self::now_millis(), std::sync::atomic::Ordering::Relaxed);
     320            0 :     }
     321              : }
     322              : 
     323              : /// Periodically write to logs if there are issues with receiving data from the broker.
     324            0 : async fn task_stats(stats: Arc<BrokerStats>) {
     325            0 :     let warn_duration = Duration::from_secs(10);
     326            0 :     let mut ticker = tokio::time::interval(warn_duration);
     327              : 
     328              :     loop {
     329            0 :         tokio::select! {
     330            0 :             _ = ticker.tick() => {
     331            0 :                 let last_pulled = stats.last_pulled_ts.load(std::sync::atomic::Ordering::SeqCst);
     332            0 :                 if last_pulled == 0 {
     333              :                     // no broker updates yet
     334            0 :                     continue;
     335            0 :                 }
     336            0 : 
     337            0 :                 let now = BrokerStats::now_millis();
     338            0 :                 if now > last_pulled && now - last_pulled > warn_duration.as_millis() as u64 {
     339            0 :                     let ts = chrono::DateTime::from_timestamp_millis(last_pulled as i64).expect("invalid timestamp");
     340            0 :                     info!("no broker updates for some time, last update: {:?}", ts);
     341            0 :                 }
     342              :             }
     343              :         }
     344              :     }
     345              : }
        

Generated by: LCOV version 2.1-beta