TLA Line data Source code
1 : //! Communication with the broker, providing safekeeper peers and pageserver coordination.
2 :
3 : use anyhow::anyhow;
4 : use anyhow::bail;
5 : use anyhow::Context;
6 :
7 : use anyhow::Error;
8 : use anyhow::Result;
9 :
10 : use storage_broker::parse_proto_ttid;
11 :
12 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
13 : use storage_broker::proto::SubscribeSafekeeperInfoRequest;
14 : use storage_broker::Request;
15 :
16 : use std::time::Duration;
17 : use std::time::Instant;
18 : use tokio::task::JoinHandle;
19 : use tokio::time::sleep;
20 : use tracing::*;
21 :
22 : use crate::metrics::BROKER_ITERATION_TIMELINES;
23 : use crate::metrics::BROKER_PULLED_UPDATES;
24 : use crate::metrics::BROKER_PUSHED_UPDATES;
25 : use crate::metrics::BROKER_PUSH_ALL_UPDATES_SECONDS;
26 : use crate::GlobalTimelines;
27 : use crate::SafeKeeperConf;
28 :
29 : const RETRY_INTERVAL_MSEC: u64 = 1000;
30 : const PUSH_INTERVAL_MSEC: u64 = 1000;
31 :
32 : /// Push once in a while data about all active timelines to the broker.
33 CBC 485 : async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
34 485 : let mut client =
35 485 : storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
36 485 : let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
37 485 :
38 485 : let outbound = async_stream::stream! {
39 : loop {
40 : // Note: we lock runtime here and in timeline methods as GlobalTimelines
41 : // is under plain mutex. That's ok, all this code is not performance
42 : // sensitive and there is no risk of deadlock as we don't await while
43 : // lock is held.
44 9234 : let now = Instant::now();
45 9234 : let all_tlis = GlobalTimelines::get_all();
46 9234 : let mut n_pushed_tlis = 0;
47 18394 : for tli in &all_tlis {
48 : // filtering alternative futures::stream::iter(all_tlis)
49 : // .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::<Vec<_>>().await;
50 : // doesn't look better, and I'm not sure how to do that without collect.
51 9160 : if !tli.is_active().await {
52 499 : continue;
53 8661 : }
54 8661 : let sk_info = tli.get_safekeeper_info(&conf).await;
55 8661 : yield sk_info;
56 8661 : BROKER_PUSHED_UPDATES.inc();
57 8661 : n_pushed_tlis += 1;
58 : }
59 9234 : let elapsed = now.elapsed();
60 9234 :
61 9234 : BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
62 9234 : BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64);
63 9234 :
64 9234 : if elapsed > push_interval / 2 {
65 UBC 0 : info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed);
66 CBC 9234 : }
67 :
68 15423 : sleep(push_interval).await;
69 : }
70 : };
71 485 : client
72 485 : .publish_safekeeper_info(Request::new(outbound))
73 485 : .await?;
74 UBC 0 : Ok(())
75 0 : }
76 :
77 : /// Subscribe and fetch all the interesting data from the broker.
78 CBC 485 : async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
79 485 : let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
80 :
81 : // TODO: subscribe only to local timelines instead of all
82 485 : let request = SubscribeSafekeeperInfoRequest {
83 485 : subscription_key: Some(ProtoSubscriptionKey::All(())),
84 485 : };
85 :
86 485 : let mut stream = client
87 485 : .subscribe_safekeeper_info(request)
88 970 : .await
89 485 : .context("subscribe_safekeper_info request failed")?
90 485 : .into_inner();
91 485 :
92 485 : let ok_counter = BROKER_PULLED_UPDATES.with_label_values(&["ok"]);
93 485 : let not_found = BROKER_PULLED_UPDATES.with_label_values(&["not_found"]);
94 485 : let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]);
95 :
96 12449 : while let Some(msg) = stream.message().await? {
97 11964 : let proto_ttid = msg
98 11964 : .tenant_timeline_id
99 11964 : .as_ref()
100 11964 : .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
101 11964 : let ttid = parse_proto_ttid(proto_ttid)?;
102 11964 : if let Ok(tli) = GlobalTimelines::get(ttid) {
103 : // Note that we also receive *our own* info. That's
104 : // important, as it is used as an indication of live
105 : // connection to the broker.
106 :
107 : // note: there are blocking operations below, but it's considered fine for now
108 11957 : let res = tli.record_safekeeper_info(msg).await;
109 11957 : if res.is_ok() {
110 11957 : ok_counter.inc();
111 11957 : } else {
112 UBC 0 : err_counter.inc();
113 0 : }
114 CBC 11957 : res?;
115 7 : } else {
116 7 : not_found.inc();
117 7 : }
118 : }
119 UBC 0 : bail!("end of stream");
120 0 : }
121 :
122 CBC 485 : pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
123 485 : info!("started, broker endpoint {:?}", conf.broker_endpoint);
124 :
125 485 : let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
126 485 : let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
127 485 : let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
128 :
129 : // Selecting on JoinHandles requires some squats; is there a better way to
130 : // reap tasks individually?
131 :
132 : // Handling failures in task itself won't catch panic and in Tokio, task's
133 : // panic doesn't kill the whole executor, so it is better to do reaping
134 : // here.
135 9739 : loop {
136 18993 : tokio::select! {
137 9254 : res = async { push_handle.as_mut().unwrap().await }, if push_handle.is_some() => {
138 : // was it panic or normal error?
139 : let err = match res {
140 : Ok(res_internal) => res_internal.unwrap_err(),
141 : Err(err_outer) => err_outer.into(),
142 : };
143 UBC 0 : warn!("push task failed: {:?}", err);
144 : push_handle = None;
145 : },
146 CBC 9254 : res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
147 : // was it panic or normal error?
148 : match res {
149 : Ok(res_internal) => if let Err(err_inner) = res_internal {
150 UBC 0 : warn!("pull task failed: {:?}", err_inner);
151 : }
152 0 : Err(err_outer) => { warn!("pull task panicked: {:?}", err_outer) }
153 : };
154 : pull_handle = None;
155 : },
156 : _ = ticker.tick() => {
157 : if push_handle.is_none() {
158 : push_handle = Some(tokio::spawn(push_loop(conf.clone())));
159 : }
160 : if pull_handle.is_none() {
161 : pull_handle = Some(tokio::spawn(pull_loop(conf.clone())));
162 : }
163 : }
164 CBC 9739 : }
165 9739 : }
166 : }
|