Line data Source code
1 : //! Communication with the broker, providing safekeeper peers and pageserver coordination.
2 :
3 : use anyhow::anyhow;
4 : use anyhow::bail;
5 : use anyhow::Context;
6 :
7 : use anyhow::Error;
8 : use anyhow::Result;
9 :
10 : use storage_broker::parse_proto_ttid;
11 :
12 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
13 : use storage_broker::proto::SubscribeSafekeeperInfoRequest;
14 : use storage_broker::Request;
15 :
16 : use std::time::Duration;
17 : use std::time::Instant;
18 : use tokio::task::JoinHandle;
19 : use tokio::time::sleep;
20 : use tracing::*;
21 :
22 : use crate::metrics::BROKER_ITERATION_TIMELINES;
23 : use crate::metrics::BROKER_PULLED_UPDATES;
24 : use crate::metrics::BROKER_PUSHED_UPDATES;
25 : use crate::metrics::BROKER_PUSH_ALL_UPDATES_SECONDS;
26 : use crate::GlobalTimelines;
27 : use crate::SafeKeeperConf;
28 :
29 : const RETRY_INTERVAL_MSEC: u64 = 1000;
30 : const PUSH_INTERVAL_MSEC: u64 = 1000;
31 :
32 : /// Push once in a while data about all active timelines to the broker.
33 509 : async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
34 509 : let mut client =
35 509 : storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
36 509 : let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
37 509 :
38 509 : let outbound = async_stream::stream! {
39 : loop {
40 : // Note: we lock runtime here and in timeline methods as GlobalTimelines
41 : // is under plain mutex. That's ok, all this code is not performance
42 : // sensitive and there is no risk of deadlock as we don't await while
43 : // lock is held.
44 8312 : let now = Instant::now();
45 8312 : let all_tlis = GlobalTimelines::get_all();
46 8312 : let mut n_pushed_tlis = 0;
47 16629 : for tli in &all_tlis {
48 : // filtering alternative futures::stream::iter(all_tlis)
49 : // .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::<Vec<_>>().await;
50 : // doesn't look better, and I'm not sure how to do that without collect.
51 8317 : if !tli.is_active().await {
52 455 : continue;
53 7862 : }
54 7862 : let sk_info = tli.get_safekeeper_info(&conf).await;
55 7862 : yield sk_info;
56 7862 : BROKER_PUSHED_UPDATES.inc();
57 7862 : n_pushed_tlis += 1;
58 : }
59 8312 : let elapsed = now.elapsed();
60 8312 :
61 8312 : BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
62 8312 : BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64);
63 8312 :
64 8312 : if elapsed > push_interval / 2 {
65 0 : info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed);
66 8312 : }
67 :
68 14035 : sleep(push_interval).await;
69 : }
70 : };
71 509 : client
72 509 : .publish_safekeeper_info(Request::new(outbound))
73 509 : .await?;
74 0 : Ok(())
75 1 : }
76 :
77 : /// Subscribe and fetch all the interesting data from the broker.
78 509 : async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
79 509 : let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
80 :
81 : // TODO: subscribe only to local timelines instead of all
82 509 : let request = SubscribeSafekeeperInfoRequest {
83 509 : subscription_key: Some(ProtoSubscriptionKey::All(())),
84 509 : };
85 :
86 509 : let mut stream = client
87 509 : .subscribe_safekeeper_info(request)
88 1016 : .await
89 509 : .context("subscribe_safekeper_info request failed")?
90 508 : .into_inner();
91 508 :
92 508 : let ok_counter = BROKER_PULLED_UPDATES.with_label_values(&["ok"]);
93 508 : let not_found = BROKER_PULLED_UPDATES.with_label_values(&["not_found"]);
94 508 : let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]);
95 :
96 11619 : while let Some(msg) = stream.message().await? {
97 11111 : let proto_ttid = msg
98 11111 : .tenant_timeline_id
99 11111 : .as_ref()
100 11111 : .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
101 11111 : let ttid = parse_proto_ttid(proto_ttid)?;
102 11111 : if let Ok(tli) = GlobalTimelines::get(ttid) {
103 : // Note that we also receive *our own* info. That's
104 : // important, as it is used as an indication of live
105 : // connection to the broker.
106 :
107 : // note: there are blocking operations below, but it's considered fine for now
108 11101 : let res = tli.record_safekeeper_info(msg).await;
109 11101 : if res.is_ok() {
110 11101 : ok_counter.inc();
111 11101 : } else {
112 0 : err_counter.inc();
113 0 : }
114 11101 : res?;
115 10 : } else {
116 10 : not_found.inc();
117 10 : }
118 : }
119 0 : bail!("end of stream");
120 1 : }
121 :
122 508 : pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
123 508 : info!("started, broker endpoint {:?}", conf.broker_endpoint);
124 :
125 508 : let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
126 508 : let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
127 508 : let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
128 :
129 : // Selecting on JoinHandles requires some squats; is there a better way to
130 : // reap tasks individually?
131 :
132 : // Handling failures in task itself won't catch panic and in Tokio, task's
133 : // panic doesn't kill the whole executor, so it is better to do reaping
134 : // here.
135 8843 : loop {
136 17177 : tokio::select! {
137 8333 : res = async { push_handle.as_mut().unwrap().await }, if push_handle.is_some() => {
138 : // was it panic or normal error?
139 : let err = match res {
140 : Ok(res_internal) => res_internal.unwrap_err(),
141 : Err(err_outer) => err_outer.into(),
142 : };
143 1 : warn!("push task failed: {:?}", err);
144 : push_handle = None;
145 : },
146 8334 : res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
147 : // was it panic or normal error?
148 : match res {
149 : Ok(res_internal) => if let Err(err_inner) = res_internal {
150 1 : warn!("pull task failed: {:?}", err_inner);
151 : }
152 0 : Err(err_outer) => { warn!("pull task panicked: {:?}", err_outer) }
153 : };
154 : pull_handle = None;
155 : },
156 : _ = ticker.tick() => {
157 : if push_handle.is_none() {
158 : push_handle = Some(tokio::spawn(push_loop(conf.clone())));
159 : }
160 : if pull_handle.is_none() {
161 : pull_handle = Some(tokio::spawn(pull_loop(conf.clone())));
162 : }
163 : }
164 8843 : }
165 8843 : }
166 : }
|