Line data Source code
1 : //! Communication with the broker, providing safekeeper peers and pageserver coordination.
2 :
3 : use anyhow::anyhow;
4 : use anyhow::bail;
5 : use anyhow::Context;
6 :
7 : use anyhow::Error;
8 : use anyhow::Result;
9 :
10 : use storage_broker::parse_proto_ttid;
11 :
12 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
13 : use storage_broker::proto::SubscribeSafekeeperInfoRequest;
14 : use storage_broker::Request;
15 :
16 : use std::time::Duration;
17 : use std::time::Instant;
18 : use tokio::task::JoinHandle;
19 : use tokio::time::sleep;
20 : use tracing::*;
21 :
22 : use crate::metrics::BROKER_ITERATION_TIMELINES;
23 : use crate::metrics::BROKER_PULLED_UPDATES;
24 : use crate::metrics::BROKER_PUSHED_UPDATES;
25 : use crate::metrics::BROKER_PUSH_ALL_UPDATES_SECONDS;
26 : use crate::GlobalTimelines;
27 : use crate::SafeKeeperConf;
28 :
29 : const RETRY_INTERVAL_MSEC: u64 = 1000;
30 : const PUSH_INTERVAL_MSEC: u64 = 1000;
31 :
32 : /// Push once in a while data about all active timelines to the broker.
33 510 : async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
34 510 : let mut client =
35 510 : storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
36 510 : let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
37 510 :
38 510 : let outbound = async_stream::stream! {
39 : loop {
40 : // Note: we lock runtime here and in timeline methods as GlobalTimelines
41 : // is under plain mutex. That's ok, all this code is not performance
42 : // sensitive and there is no risk of deadlock as we don't await while
43 : // lock is held.
44 8516 : let now = Instant::now();
45 8516 : let all_tlis = GlobalTimelines::get_all();
46 8516 : let mut n_pushed_tlis = 0;
47 16991 : for tli in &all_tlis {
48 : // filtering alternative futures::stream::iter(all_tlis)
49 : // .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::<Vec<_>>().await;
50 : // doesn't look better, and I'm not sure how to do that without collect.
51 8475 : if !tli.is_active().await {
52 463 : continue;
53 8012 : }
54 8012 : let sk_info = tli.get_safekeeper_info(&conf).await;
55 8012 : yield sk_info;
56 8012 : BROKER_PUSHED_UPDATES.inc();
57 8012 : n_pushed_tlis += 1;
58 : }
59 8516 : let elapsed = now.elapsed();
60 8516 :
61 8516 : BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
62 8516 : BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64);
63 8516 :
64 8516 : if elapsed > push_interval / 2 {
65 0 : info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed);
66 8516 : }
67 :
68 14342 : sleep(push_interval).await;
69 : }
70 : };
71 510 : client
72 510 : .publish_safekeeper_info(Request::new(outbound))
73 510 : .await?;
74 0 : Ok(())
75 0 : }
76 :
77 : /// Subscribe and fetch all the interesting data from the broker.
78 510 : async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
79 510 : let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
80 :
81 : // TODO: subscribe only to local timelines instead of all
82 510 : let request = SubscribeSafekeeperInfoRequest {
83 510 : subscription_key: Some(ProtoSubscriptionKey::All(())),
84 510 : };
85 :
86 510 : let mut stream = client
87 510 : .subscribe_safekeeper_info(request)
88 1020 : .await
89 510 : .context("subscribe_safekeper_info request failed")?
90 510 : .into_inner();
91 510 :
92 510 : let ok_counter = BROKER_PULLED_UPDATES.with_label_values(&["ok"]);
93 510 : let not_found = BROKER_PULLED_UPDATES.with_label_values(&["not_found"]);
94 510 : let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]);
95 :
96 11495 : while let Some(msg) = stream.message().await? {
97 10985 : let proto_ttid = msg
98 10985 : .tenant_timeline_id
99 10985 : .as_ref()
100 10985 : .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
101 10985 : let ttid = parse_proto_ttid(proto_ttid)?;
102 10985 : if let Ok(tli) = GlobalTimelines::get(ttid) {
103 : // Note that we also receive *our own* info. That's
104 : // important, as it is used as an indication of live
105 : // connection to the broker.
106 :
107 : // note: there are blocking operations below, but it's considered fine for now
108 10979 : let res = tli.record_safekeeper_info(msg).await;
109 10979 : if res.is_ok() {
110 10979 : ok_counter.inc();
111 10979 : } else {
112 0 : err_counter.inc();
113 0 : }
114 10979 : res?;
115 6 : } else {
116 6 : not_found.inc();
117 6 : }
118 : }
119 0 : bail!("end of stream");
120 0 : }
121 :
122 510 : pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
123 510 : info!("started, broker endpoint {:?}", conf.broker_endpoint);
124 :
125 510 : let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
126 510 : let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
127 510 : let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
128 :
129 : // Selecting on JoinHandles requires some squats; is there a better way to
130 : // reap tasks individually?
131 :
132 : // Handling failures in task itself won't catch panic and in Tokio, task's
133 : // panic doesn't kill the whole executor, so it is better to do reaping
134 : // here.
135 9041 : loop {
136 17572 : tokio::select! {
137 8531 : res = async { push_handle.as_mut().unwrap().await }, if push_handle.is_some() => {
138 : // was it panic or normal error?
139 : let err = match res {
140 : Ok(res_internal) => res_internal.unwrap_err(),
141 : Err(err_outer) => err_outer.into(),
142 : };
143 0 : warn!("push task failed: {:?}", err);
144 : push_handle = None;
145 : },
146 8531 : res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
147 : // was it panic or normal error?
148 : match res {
149 : Ok(res_internal) => if let Err(err_inner) = res_internal {
150 0 : warn!("pull task failed: {:?}", err_inner);
151 : }
152 0 : Err(err_outer) => { warn!("pull task panicked: {:?}", err_outer) }
153 : };
154 : pull_handle = None;
155 : },
156 : _ = ticker.tick() => {
157 : if push_handle.is_none() {
158 : push_handle = Some(tokio::spawn(push_loop(conf.clone())));
159 : }
160 : if pull_handle.is_none() {
161 : pull_handle = Some(tokio::spawn(pull_loop(conf.clone())));
162 : }
163 : }
164 9041 : }
165 9041 : }
166 : }
|