LCOV - code coverage report
Current view: top level - safekeeper/src - broker.rs (source / functions) Coverage Total Hit
Test: 496e96cdfff2df79370229591d6427cda12fde29.info Lines: 0.0 % 148 0
Test Date: 2024-05-21 18:28:29 Functions: 0.0 % 18 0

            Line data    Source code
       1              : //! Communication with the broker, providing safekeeper peers and pageserver coordination.
       2              : 
       3              : use anyhow::anyhow;
       4              : use anyhow::bail;
       5              : use anyhow::Context;
       6              : 
       7              : use anyhow::Error;
       8              : use anyhow::Result;
       9              : 
      10              : use storage_broker::parse_proto_ttid;
      11              : 
      12              : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
      13              : use storage_broker::proto::FilterTenantTimelineId;
      14              : use storage_broker::proto::MessageType;
      15              : use storage_broker::proto::SafekeeperDiscoveryResponse;
      16              : use storage_broker::proto::SubscribeByFilterRequest;
      17              : use storage_broker::proto::SubscribeSafekeeperInfoRequest;
      18              : use storage_broker::proto::TypeSubscription;
      19              : use storage_broker::proto::TypedMessage;
      20              : use storage_broker::Request;
      21              : 
      22              : use std::sync::atomic::AtomicU64;
      23              : use std::sync::Arc;
      24              : use std::time::Duration;
      25              : use std::time::Instant;
      26              : use std::time::UNIX_EPOCH;
      27              : use tokio::task::JoinHandle;
      28              : use tokio::time::sleep;
      29              : use tracing::*;
      30              : 
      31              : use crate::metrics::BROKER_ITERATION_TIMELINES;
      32              : use crate::metrics::BROKER_PULLED_UPDATES;
      33              : use crate::metrics::BROKER_PUSHED_UPDATES;
      34              : use crate::metrics::BROKER_PUSH_ALL_UPDATES_SECONDS;
      35              : use crate::GlobalTimelines;
      36              : use crate::SafeKeeperConf;
      37              : 
      38              : const RETRY_INTERVAL_MSEC: u64 = 1000;
      39              : const PUSH_INTERVAL_MSEC: u64 = 1000;
      40              : 
      41              : /// Push once in a while data about all active timelines to the broker.
      42            0 : async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
      43            0 :     if conf.disable_periodic_broker_push {
      44            0 :         info!("broker push_loop is disabled, doing nothing...");
      45            0 :         futures::future::pending::<()>().await; // sleep forever
      46            0 :         return Ok(());
      47            0 :     }
      48              : 
      49            0 :     let mut client =
      50            0 :         storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
      51            0 :     let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
      52            0 : 
      53            0 :     let outbound = async_stream::stream! {
      54              :         loop {
      55              :             // Note: we lock runtime here and in timeline methods as GlobalTimelines
      56              :             // is under plain mutex. That's ok, all this code is not performance
      57              :             // sensitive and there is no risk of deadlock as we don't await while
      58              :             // lock is held.
      59              :             let now = Instant::now();
      60              :             let all_tlis = GlobalTimelines::get_all();
      61              :             let mut n_pushed_tlis = 0;
      62              :             for tli in &all_tlis {
      63              :                 // filtering alternative futures::stream::iter(all_tlis)
      64              :                 //   .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::<Vec<_>>().await;
      65              :                 // doesn't look better, and I'm not sure how to do that without collect.
      66              :                 if !tli.is_active().await {
      67              :                     continue;
      68              :                 }
      69              :                 let sk_info = tli.get_safekeeper_info(&conf).await;
      70              :                 yield sk_info;
      71              :                 BROKER_PUSHED_UPDATES.inc();
      72              :                 n_pushed_tlis += 1;
      73              :             }
      74              :             let elapsed = now.elapsed();
      75              : 
      76              :             BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
      77              :             BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64);
      78              : 
      79              :             if elapsed > push_interval / 2 {
      80              :                 info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed);
      81              :             }
      82              : 
      83              :             sleep(push_interval).await;
      84              :         }
      85              :     };
      86            0 :     client
      87            0 :         .publish_safekeeper_info(Request::new(outbound))
      88            0 :         .await?;
      89            0 :     Ok(())
      90            0 : }
      91              : 
      92              : /// Subscribe and fetch all the interesting data from the broker.
      93            0 : async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
      94            0 :     let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
      95              : 
      96              :     // TODO: subscribe only to local timelines instead of all
      97            0 :     let request = SubscribeSafekeeperInfoRequest {
      98            0 :         subscription_key: Some(ProtoSubscriptionKey::All(())),
      99            0 :     };
     100              : 
     101            0 :     let mut stream = client
     102            0 :         .subscribe_safekeeper_info(request)
     103            0 :         .await
     104            0 :         .context("subscribe_safekeper_info request failed")?
     105            0 :         .into_inner();
     106            0 : 
     107            0 :     let ok_counter = BROKER_PULLED_UPDATES.with_label_values(&["ok"]);
     108            0 :     let not_found = BROKER_PULLED_UPDATES.with_label_values(&["not_found"]);
     109            0 :     let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]);
     110              : 
     111            0 :     while let Some(msg) = stream.message().await? {
     112            0 :         stats.update_pulled();
     113              : 
     114            0 :         let proto_ttid = msg
     115            0 :             .tenant_timeline_id
     116            0 :             .as_ref()
     117            0 :             .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
     118            0 :         let ttid = parse_proto_ttid(proto_ttid)?;
     119            0 :         if let Ok(tli) = GlobalTimelines::get(ttid) {
     120              :             // Note that we also receive *our own* info. That's
     121              :             // important, as it is used as an indication of live
     122              :             // connection to the broker.
     123              : 
     124              :             // note: there are blocking operations below, but it's considered fine for now
     125            0 :             let res = tli.record_safekeeper_info(msg).await;
     126            0 :             if res.is_ok() {
     127            0 :                 ok_counter.inc();
     128            0 :             } else {
     129            0 :                 err_counter.inc();
     130            0 :             }
     131            0 :             res?;
     132            0 :         } else {
     133            0 :             not_found.inc();
     134            0 :         }
     135              :     }
     136            0 :     bail!("end of stream");
     137            0 : }
     138              : 
     139              : /// Process incoming discover requests. This is done in a separate task to avoid
     140              : /// interfering with the normal pull/push loops.
     141            0 : async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
     142            0 :     let mut client =
     143            0 :         storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
     144              : 
     145            0 :     let request = SubscribeByFilterRequest {
     146            0 :         types: vec![TypeSubscription {
     147            0 :             r#type: MessageType::SafekeeperDiscoveryRequest as i32,
     148            0 :         }],
     149            0 :         tenant_timeline_id: Some(FilterTenantTimelineId {
     150            0 :             enabled: false,
     151            0 :             tenant_timeline_id: None,
     152            0 :         }),
     153            0 :     };
     154              : 
     155            0 :     let mut stream = client
     156            0 :         .subscribe_by_filter(request)
     157            0 :         .await
     158            0 :         .context("subscribe_by_filter request failed")?
     159            0 :         .into_inner();
     160            0 : 
     161            0 :     let discover_counter = BROKER_PULLED_UPDATES.with_label_values(&["discover"]);
     162              : 
     163            0 :     while let Some(typed_msg) = stream.message().await? {
     164            0 :         stats.update_pulled();
     165            0 : 
     166            0 :         match typed_msg.r#type() {
     167              :             MessageType::SafekeeperDiscoveryRequest => {
     168            0 :                 let msg = typed_msg
     169            0 :                     .safekeeper_discovery_request
     170            0 :                     .expect("proto type mismatch from broker message");
     171              : 
     172            0 :                 let proto_ttid = msg
     173            0 :                     .tenant_timeline_id
     174            0 :                     .as_ref()
     175            0 :                     .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
     176            0 :                 let ttid = parse_proto_ttid(proto_ttid)?;
     177            0 :                 if let Ok(tli) = GlobalTimelines::get(ttid) {
     178              :                     // we received a discovery request for a timeline we know about
     179            0 :                     discover_counter.inc();
     180              : 
     181              :                     // create and reply with discovery response
     182            0 :                     let sk_info = tli.get_safekeeper_info(&conf).await;
     183            0 :                     let response = SafekeeperDiscoveryResponse {
     184            0 :                         safekeeper_id: sk_info.safekeeper_id,
     185            0 :                         tenant_timeline_id: sk_info.tenant_timeline_id,
     186            0 :                         commit_lsn: sk_info.commit_lsn,
     187            0 :                         safekeeper_connstr: sk_info.safekeeper_connstr,
     188            0 :                         availability_zone: sk_info.availability_zone,
     189            0 :                         standby_horizon: 0,
     190            0 :                     };
     191            0 : 
     192            0 :                     // note this is a blocking call
     193            0 :                     client
     194            0 :                         .publish_one(TypedMessage {
     195            0 :                             r#type: MessageType::SafekeeperDiscoveryResponse as i32,
     196            0 :                             safekeeper_timeline_info: None,
     197            0 :                             safekeeper_discovery_request: None,
     198            0 :                             safekeeper_discovery_response: Some(response),
     199            0 :                         })
     200            0 :                         .await?;
     201            0 :                 }
     202              :             }
     203              : 
     204              :             _ => {
     205            0 :                 warn!(
     206            0 :                     "unexpected message type i32 {}, {:?}",
     207            0 :                     typed_msg.r#type,
     208            0 :                     typed_msg.r#type()
     209              :                 );
     210              :             }
     211              :         }
     212              :     }
     213            0 :     bail!("end of stream");
     214            0 : }
     215              : 
     216            0 : pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
     217            0 :     info!("started, broker endpoint {:?}", conf.broker_endpoint);
     218              : 
     219            0 :     let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
     220            0 :     let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
     221            0 :     let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
     222            0 :     let mut discover_handle: Option<JoinHandle<Result<(), Error>>> = None;
     223            0 : 
     224            0 :     let stats = Arc::new(BrokerStats::new());
     225            0 :     let stats_task = task_stats(stats.clone());
     226            0 :     tokio::pin!(stats_task);
     227              : 
     228              :     // Selecting on JoinHandles requires some squats; is there a better way to
     229              :     // reap tasks individually?
     230              : 
     231              :     // Handling failures in task itself won't catch panic and in Tokio, task's
     232              :     // panic doesn't kill the whole executor, so it is better to do reaping
     233              :     // here.
     234            0 :     loop {
     235            0 :         tokio::select! {
     236            0 :                 res = async { push_handle.as_mut().unwrap().await }, if push_handle.is_some() => {
     237              :                     // was it panic or normal error?
     238              :                     let err = match res {
     239              :                         Ok(res_internal) => res_internal.unwrap_err(),
     240              :                         Err(err_outer) => err_outer.into(),
     241              :                     };
     242              :                     warn!("push task failed: {:?}", err);
     243              :                     push_handle = None;
     244              :                 },
     245            0 :                 res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
     246              :                     // was it panic or normal error?
     247              :                     match res {
     248              :                         Ok(res_internal) => if let Err(err_inner) = res_internal {
     249              :                             warn!("pull task failed: {:?}", err_inner);
     250              :                         }
     251              :                         Err(err_outer) => { warn!("pull task panicked: {:?}", err_outer) }
     252              :                     };
     253              :                     pull_handle = None;
     254              :                 },
     255            0 :                 res = async { discover_handle.as_mut().unwrap().await }, if discover_handle.is_some() => {
     256              :                     // was it panic or normal error?
     257              :                     match res {
     258              :                         Ok(res_internal) => if let Err(err_inner) = res_internal {
     259              :                             warn!("discover task failed: {:?}", err_inner);
     260              :                         }
     261              :                         Err(err_outer) => { warn!("discover task panicked: {:?}", err_outer) }
     262              :                     };
     263              :                     discover_handle = None;
     264              :                 },
     265              :                 _ = ticker.tick() => {
     266              :                     if push_handle.is_none() {
     267              :                         push_handle = Some(tokio::spawn(push_loop(conf.clone())));
     268              :                     }
     269              :                     if pull_handle.is_none() {
     270              :                         pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), stats.clone())));
     271              :                     }
     272              :                     if discover_handle.is_none() {
     273              :                         discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), stats.clone())));
     274              :                     }
     275              :                 },
     276              :                 _ = &mut stats_task => {}
     277            0 :         }
     278            0 :     }
     279              : }
     280              : 
     281              : struct BrokerStats {
     282              :     /// Timestamp of the last received message from the broker.
     283              :     last_pulled_ts: AtomicU64,
     284              : }
     285              : 
     286              : impl BrokerStats {
     287            0 :     fn new() -> Self {
     288            0 :         BrokerStats {
     289            0 :             last_pulled_ts: AtomicU64::new(0),
     290            0 :         }
     291            0 :     }
     292              : 
     293            0 :     fn now_millis() -> u64 {
     294            0 :         std::time::SystemTime::now()
     295            0 :             .duration_since(UNIX_EPOCH)
     296            0 :             .expect("time is before epoch")
     297            0 :             .as_millis() as u64
     298            0 :     }
     299              : 
     300              :     /// Update last_pulled timestamp to current time.
     301            0 :     fn update_pulled(&self) {
     302            0 :         self.last_pulled_ts
     303            0 :             .store(Self::now_millis(), std::sync::atomic::Ordering::Relaxed);
     304            0 :     }
     305              : }
     306              : 
     307              : /// Periodically write to logs if there are issues with receiving data from the broker.
     308            0 : async fn task_stats(stats: Arc<BrokerStats>) {
     309            0 :     let warn_duration = Duration::from_secs(10);
     310            0 :     let mut ticker = tokio::time::interval(warn_duration);
     311              : 
     312            0 :     loop {
     313            0 :         tokio::select! {
     314              :             _ = ticker.tick() => {
     315              :                 let last_pulled = stats.last_pulled_ts.load(std::sync::atomic::Ordering::SeqCst);
     316              :                 if last_pulled == 0 {
     317              :                     // no broker updates yet
     318              :                     continue;
     319              :                 }
     320              : 
     321              :                 let now = BrokerStats::now_millis();
     322              :                 if now > last_pulled && now - last_pulled > warn_duration.as_millis() as u64 {
     323              :                     let ts = chrono::DateTime::from_timestamp_millis(last_pulled as i64).expect("invalid timestamp");
     324              :                     info!("no broker updates for some time, last update: {:?}", ts);
     325              :                 }
     326              :             }
     327            0 :         }
     328            0 :     }
     329              : }
        

Generated by: LCOV version 2.1-beta