LCOV - differential code coverage report
Current view: top level - safekeeper/src - broker.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 87.2 % 78 68 10 68
Current Date: 2024-01-09 02:06:09 Functions: 68.8 % 16 11 5 11
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Communication with the broker, providing safekeeper peers and pageserver coordination.
       2                 : 
       3                 : use anyhow::anyhow;
       4                 : use anyhow::bail;
       5                 : use anyhow::Context;
       6                 : 
       7                 : use anyhow::Error;
       8                 : use anyhow::Result;
       9                 : 
      10                 : use storage_broker::parse_proto_ttid;
      11                 : 
      12                 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
      13                 : use storage_broker::proto::SubscribeSafekeeperInfoRequest;
      14                 : use storage_broker::Request;
      15                 : 
      16                 : use std::time::Duration;
      17                 : use std::time::Instant;
      18                 : use tokio::task::JoinHandle;
      19                 : use tokio::time::sleep;
      20                 : use tracing::*;
      21                 : 
      22                 : use crate::metrics::BROKER_ITERATION_TIMELINES;
      23                 : use crate::metrics::BROKER_PULLED_UPDATES;
      24                 : use crate::metrics::BROKER_PUSHED_UPDATES;
      25                 : use crate::metrics::BROKER_PUSH_ALL_UPDATES_SECONDS;
      26                 : use crate::GlobalTimelines;
      27                 : use crate::SafeKeeperConf;
      28                 : 
      29                 : const RETRY_INTERVAL_MSEC: u64 = 1000;
      30                 : const PUSH_INTERVAL_MSEC: u64 = 1000;
      31                 : 
      32                 : /// Push once in a while data about all active timelines to the broker.
      33 CBC         485 : async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
      34             485 :     let mut client =
      35             485 :         storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
      36             485 :     let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
      37             485 : 
      38             485 :     let outbound = async_stream::stream! {
      39                 :         loop {
      40                 :             // Note: we lock runtime here and in timeline methods as GlobalTimelines
      41                 :             // is under plain mutex. That's ok, all this code is not performance
      42                 :             // sensitive and there is no risk of deadlock as we don't await while
      43                 :             // lock is held.
      44            9234 :             let now = Instant::now();
      45            9234 :             let all_tlis = GlobalTimelines::get_all();
      46            9234 :             let mut n_pushed_tlis = 0;
      47           18394 :             for tli in &all_tlis {
      48                 :                 // filtering alternative futures::stream::iter(all_tlis)
      49                 :                 //   .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::<Vec<_>>().await;
      50                 :                 // doesn't look better, and I'm not sure how to do that without collect.
      51            9160 :                 if !tli.is_active().await {
      52             499 :                     continue;
      53            8661 :                 }
      54            8661 :                 let sk_info = tli.get_safekeeper_info(&conf).await;
      55            8661 :                 yield sk_info;
      56            8661 :                 BROKER_PUSHED_UPDATES.inc();
      57            8661 :                 n_pushed_tlis += 1;
      58                 :             }
      59            9234 :             let elapsed = now.elapsed();
      60            9234 : 
      61            9234 :             BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
      62            9234 :             BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64);
      63            9234 : 
      64            9234 :             if elapsed > push_interval / 2 {
      65 UBC           0 :                 info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed);
      66 CBC        9234 :             }
      67                 : 
      68           15423 :             sleep(push_interval).await;
      69                 :         }
      70                 :     };
      71             485 :     client
      72             485 :         .publish_safekeeper_info(Request::new(outbound))
      73             485 :         .await?;
      74 UBC           0 :     Ok(())
      75               0 : }
      76                 : 
      77                 : /// Subscribe and fetch all the interesting data from the broker.
      78 CBC         485 : async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
      79             485 :     let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
      80                 : 
      81                 :     // TODO: subscribe only to local timelines instead of all
      82             485 :     let request = SubscribeSafekeeperInfoRequest {
      83             485 :         subscription_key: Some(ProtoSubscriptionKey::All(())),
      84             485 :     };
      85                 : 
      86             485 :     let mut stream = client
      87             485 :         .subscribe_safekeeper_info(request)
      88             970 :         .await
      89             485 :         .context("subscribe_safekeper_info request failed")?
      90             485 :         .into_inner();
      91             485 : 
      92             485 :     let ok_counter = BROKER_PULLED_UPDATES.with_label_values(&["ok"]);
      93             485 :     let not_found = BROKER_PULLED_UPDATES.with_label_values(&["not_found"]);
      94             485 :     let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]);
      95                 : 
      96           12449 :     while let Some(msg) = stream.message().await? {
      97           11964 :         let proto_ttid = msg
      98           11964 :             .tenant_timeline_id
      99           11964 :             .as_ref()
     100           11964 :             .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
     101           11964 :         let ttid = parse_proto_ttid(proto_ttid)?;
     102           11964 :         if let Ok(tli) = GlobalTimelines::get(ttid) {
     103                 :             // Note that we also receive *our own* info. That's
     104                 :             // important, as it is used as an indication of live
     105                 :             // connection to the broker.
     106                 : 
     107                 :             // note: there are blocking operations below, but it's considered fine for now
     108           11957 :             let res = tli.record_safekeeper_info(msg).await;
     109           11957 :             if res.is_ok() {
     110           11957 :                 ok_counter.inc();
     111           11957 :             } else {
     112 UBC           0 :                 err_counter.inc();
     113               0 :             }
     114 CBC       11957 :             res?;
     115               7 :         } else {
     116               7 :             not_found.inc();
     117               7 :         }
     118                 :     }
     119 UBC           0 :     bail!("end of stream");
     120               0 : }
     121                 : 
     122 CBC         485 : pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
     123             485 :     info!("started, broker endpoint {:?}", conf.broker_endpoint);
     124                 : 
     125             485 :     let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
     126             485 :     let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
     127             485 :     let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
     128                 : 
     129                 :     // Selecting on JoinHandles requires some squats; is there a better way to
     130                 :     // reap tasks individually?
     131                 : 
     132                 :     // Handling failures in task itself won't catch panic and in Tokio, task's
     133                 :     // panic doesn't kill the whole executor, so it is better to do reaping
     134                 :     // here.
     135            9739 :     loop {
     136           18993 :         tokio::select! {
     137            9254 :                 res = async { push_handle.as_mut().unwrap().await }, if push_handle.is_some() => {
     138                 :                     // was it panic or normal error?
     139                 :                     let err = match res {
     140                 :                         Ok(res_internal) => res_internal.unwrap_err(),
     141                 :                         Err(err_outer) => err_outer.into(),
     142                 :                     };
     143 UBC           0 :                     warn!("push task failed: {:?}", err);
     144                 :                     push_handle = None;
     145                 :                 },
     146 CBC        9254 :                 res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
     147                 :                     // was it panic or normal error?
     148                 :                     match res {
     149                 :                         Ok(res_internal) => if let Err(err_inner) = res_internal {
     150 UBC           0 :                             warn!("pull task failed: {:?}", err_inner);
     151                 :                         }
     152               0 :                         Err(err_outer) => { warn!("pull task panicked: {:?}", err_outer) }
     153                 :                     };
     154                 :                     pull_handle = None;
     155                 :                 },
     156                 :                 _ = ticker.tick() => {
     157                 :                     if push_handle.is_none() {
     158                 :                         push_handle = Some(tokio::spawn(push_loop(conf.clone())));
     159                 :                     }
     160                 :                     if pull_handle.is_none() {
     161                 :                         pull_handle = Some(tokio::spawn(pull_loop(conf.clone())));
     162                 :                     }
     163                 :             }
     164 CBC        9739 :         }
     165            9739 :     }
     166                 : }
        

Generated by: LCOV version 2.1-beta