Line data Source code
1 : //! Communication with the broker, providing safekeeper peers and pageserver coordination.
2 :
3 : use anyhow::anyhow;
4 : use anyhow::bail;
5 : use anyhow::Context;
6 :
7 : use anyhow::Error;
8 : use anyhow::Result;
9 :
10 : use storage_broker::parse_proto_ttid;
11 :
12 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
13 : use storage_broker::proto::FilterTenantTimelineId;
14 : use storage_broker::proto::MessageType;
15 : use storage_broker::proto::SafekeeperDiscoveryResponse;
16 : use storage_broker::proto::SubscribeByFilterRequest;
17 : use storage_broker::proto::SubscribeSafekeeperInfoRequest;
18 : use storage_broker::proto::TypeSubscription;
19 : use storage_broker::proto::TypedMessage;
20 : use storage_broker::Request;
21 :
22 : use std::sync::atomic::AtomicU64;
23 : use std::sync::Arc;
24 : use std::time::Duration;
25 : use std::time::Instant;
26 : use std::time::UNIX_EPOCH;
27 : use tokio::task::JoinHandle;
28 : use tokio::time::sleep;
29 : use tracing::*;
30 :
31 : use crate::metrics::BROKER_ITERATION_TIMELINES;
32 : use crate::metrics::BROKER_PULLED_UPDATES;
33 : use crate::metrics::BROKER_PUSHED_UPDATES;
34 : use crate::metrics::BROKER_PUSH_ALL_UPDATES_SECONDS;
35 : use crate::GlobalTimelines;
36 : use crate::SafeKeeperConf;
37 :
38 : const RETRY_INTERVAL_MSEC: u64 = 1000;
39 : const PUSH_INTERVAL_MSEC: u64 = 1000;
40 :
41 : /// Push once in a while data about all active timelines to the broker.
42 0 : async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
43 0 : if conf.disable_periodic_broker_push {
44 0 : info!("broker push_loop is disabled, doing nothing...");
45 0 : futures::future::pending::<()>().await; // sleep forever
46 0 : return Ok(());
47 0 : }
48 :
49 0 : let mut client =
50 0 : storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
51 0 : let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
52 0 :
53 0 : let outbound = async_stream::stream! {
54 : loop {
55 : // Note: we lock runtime here and in timeline methods as GlobalTimelines
56 : // is under plain mutex. That's ok, all this code is not performance
57 : // sensitive and there is no risk of deadlock as we don't await while
58 : // lock is held.
59 : let now = Instant::now();
60 : let all_tlis = GlobalTimelines::get_all();
61 : let mut n_pushed_tlis = 0;
62 : for tli in &all_tlis {
63 : // filtering alternative futures::stream::iter(all_tlis)
64 : // .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::<Vec<_>>().await;
65 : // doesn't look better, and I'm not sure how to do that without collect.
66 : if !tli.is_active().await {
67 : continue;
68 : }
69 : let sk_info = tli.get_safekeeper_info(&conf).await;
70 : yield sk_info;
71 : BROKER_PUSHED_UPDATES.inc();
72 : n_pushed_tlis += 1;
73 : }
74 : let elapsed = now.elapsed();
75 :
76 : BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
77 : BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64);
78 :
79 : if elapsed > push_interval / 2 {
80 : info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed);
81 : }
82 :
83 : sleep(push_interval).await;
84 : }
85 : };
86 0 : client
87 0 : .publish_safekeeper_info(Request::new(outbound))
88 0 : .await?;
89 0 : Ok(())
90 0 : }
91 :
92 : /// Subscribe and fetch all the interesting data from the broker.
93 0 : async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
94 0 : let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
95 :
96 : // TODO: subscribe only to local timelines instead of all
97 0 : let request = SubscribeSafekeeperInfoRequest {
98 0 : subscription_key: Some(ProtoSubscriptionKey::All(())),
99 0 : };
100 :
101 0 : let mut stream = client
102 0 : .subscribe_safekeeper_info(request)
103 0 : .await
104 0 : .context("subscribe_safekeper_info request failed")?
105 0 : .into_inner();
106 0 :
107 0 : let ok_counter = BROKER_PULLED_UPDATES.with_label_values(&["ok"]);
108 0 : let not_found = BROKER_PULLED_UPDATES.with_label_values(&["not_found"]);
109 0 : let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]);
110 :
111 0 : while let Some(msg) = stream.message().await? {
112 0 : stats.update_pulled();
113 :
114 0 : let proto_ttid = msg
115 0 : .tenant_timeline_id
116 0 : .as_ref()
117 0 : .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
118 0 : let ttid = parse_proto_ttid(proto_ttid)?;
119 0 : if let Ok(tli) = GlobalTimelines::get(ttid) {
120 : // Note that we also receive *our own* info. That's
121 : // important, as it is used as an indication of live
122 : // connection to the broker.
123 :
124 : // note: there are blocking operations below, but it's considered fine for now
125 0 : let res = tli.record_safekeeper_info(msg).await;
126 0 : if res.is_ok() {
127 0 : ok_counter.inc();
128 0 : } else {
129 0 : err_counter.inc();
130 0 : }
131 0 : res?;
132 0 : } else {
133 0 : not_found.inc();
134 0 : }
135 : }
136 0 : bail!("end of stream");
137 0 : }
138 :
139 : /// Process incoming discover requests. This is done in a separate task to avoid
140 : /// interfering with the normal pull/push loops.
141 0 : async fn discover_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
142 0 : let mut client =
143 0 : storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
144 :
145 0 : let request = SubscribeByFilterRequest {
146 0 : types: vec![TypeSubscription {
147 0 : r#type: MessageType::SafekeeperDiscoveryRequest as i32,
148 0 : }],
149 0 : tenant_timeline_id: Some(FilterTenantTimelineId {
150 0 : enabled: false,
151 0 : tenant_timeline_id: None,
152 0 : }),
153 0 : };
154 :
155 0 : let mut stream = client
156 0 : .subscribe_by_filter(request)
157 0 : .await
158 0 : .context("subscribe_by_filter request failed")?
159 0 : .into_inner();
160 0 :
161 0 : let discover_counter = BROKER_PULLED_UPDATES.with_label_values(&["discover"]);
162 :
163 0 : while let Some(typed_msg) = stream.message().await? {
164 0 : stats.update_pulled();
165 0 :
166 0 : match typed_msg.r#type() {
167 : MessageType::SafekeeperDiscoveryRequest => {
168 0 : let msg = typed_msg
169 0 : .safekeeper_discovery_request
170 0 : .expect("proto type mismatch from broker message");
171 :
172 0 : let proto_ttid = msg
173 0 : .tenant_timeline_id
174 0 : .as_ref()
175 0 : .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
176 0 : let ttid = parse_proto_ttid(proto_ttid)?;
177 0 : if let Ok(tli) = GlobalTimelines::get(ttid) {
178 : // we received a discovery request for a timeline we know about
179 0 : discover_counter.inc();
180 :
181 : // create and reply with discovery response
182 0 : let sk_info = tli.get_safekeeper_info(&conf).await;
183 0 : let response = SafekeeperDiscoveryResponse {
184 0 : safekeeper_id: sk_info.safekeeper_id,
185 0 : tenant_timeline_id: sk_info.tenant_timeline_id,
186 0 : commit_lsn: sk_info.commit_lsn,
187 0 : safekeeper_connstr: sk_info.safekeeper_connstr,
188 0 : availability_zone: sk_info.availability_zone,
189 0 : };
190 0 :
191 0 : // note this is a blocking call
192 0 : client
193 0 : .publish_one(TypedMessage {
194 0 : r#type: MessageType::SafekeeperDiscoveryResponse as i32,
195 0 : safekeeper_timeline_info: None,
196 0 : safekeeper_discovery_request: None,
197 0 : safekeeper_discovery_response: Some(response),
198 0 : })
199 0 : .await?;
200 0 : }
201 : }
202 :
203 : _ => {
204 0 : warn!(
205 0 : "unexpected message type i32 {}, {:?}",
206 0 : typed_msg.r#type,
207 0 : typed_msg.r#type()
208 : );
209 : }
210 : }
211 : }
212 0 : bail!("end of stream");
213 0 : }
214 :
215 0 : pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
216 0 : info!("started, broker endpoint {:?}", conf.broker_endpoint);
217 :
218 0 : let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
219 0 : let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
220 0 : let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
221 0 : let mut discover_handle: Option<JoinHandle<Result<(), Error>>> = None;
222 0 :
223 0 : let stats = Arc::new(BrokerStats::new());
224 0 : let stats_task = task_stats(stats.clone());
225 0 : tokio::pin!(stats_task);
226 :
227 : // Selecting on JoinHandles requires some squats; is there a better way to
228 : // reap tasks individually?
229 :
230 : // Handling failures in task itself won't catch panic and in Tokio, task's
231 : // panic doesn't kill the whole executor, so it is better to do reaping
232 : // here.
233 0 : loop {
234 0 : tokio::select! {
235 0 : res = async { push_handle.as_mut().unwrap().await }, if push_handle.is_some() => {
236 : // was it panic or normal error?
237 : let err = match res {
238 : Ok(res_internal) => res_internal.unwrap_err(),
239 : Err(err_outer) => err_outer.into(),
240 : };
241 : warn!("push task failed: {:?}", err);
242 : push_handle = None;
243 : },
244 0 : res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
245 : // was it panic or normal error?
246 : match res {
247 : Ok(res_internal) => if let Err(err_inner) = res_internal {
248 : warn!("pull task failed: {:?}", err_inner);
249 : }
250 : Err(err_outer) => { warn!("pull task panicked: {:?}", err_outer) }
251 : };
252 : pull_handle = None;
253 : },
254 0 : res = async { discover_handle.as_mut().unwrap().await }, if discover_handle.is_some() => {
255 : // was it panic or normal error?
256 : match res {
257 : Ok(res_internal) => if let Err(err_inner) = res_internal {
258 : warn!("discover task failed: {:?}", err_inner);
259 : }
260 : Err(err_outer) => { warn!("discover task panicked: {:?}", err_outer) }
261 : };
262 : discover_handle = None;
263 : },
264 : _ = ticker.tick() => {
265 : if push_handle.is_none() {
266 : push_handle = Some(tokio::spawn(push_loop(conf.clone())));
267 : }
268 : if pull_handle.is_none() {
269 : pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), stats.clone())));
270 : }
271 : if discover_handle.is_none() {
272 : discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), stats.clone())));
273 : }
274 : },
275 : _ = &mut stats_task => {}
276 0 : }
277 0 : }
278 : }
279 :
280 : struct BrokerStats {
281 : /// Timestamp of the last received message from the broker.
282 : last_pulled_ts: AtomicU64,
283 : }
284 :
285 : impl BrokerStats {
286 0 : fn new() -> Self {
287 0 : BrokerStats {
288 0 : last_pulled_ts: AtomicU64::new(0),
289 0 : }
290 0 : }
291 :
292 0 : fn now_millis() -> u64 {
293 0 : std::time::SystemTime::now()
294 0 : .duration_since(UNIX_EPOCH)
295 0 : .expect("time is before epoch")
296 0 : .as_millis() as u64
297 0 : }
298 :
299 : /// Update last_pulled timestamp to current time.
300 0 : fn update_pulled(&self) {
301 0 : self.last_pulled_ts
302 0 : .store(Self::now_millis(), std::sync::atomic::Ordering::Relaxed);
303 0 : }
304 : }
305 :
306 : /// Periodically write to logs if there are issues with receiving data from the broker.
307 0 : async fn task_stats(stats: Arc<BrokerStats>) {
308 0 : let warn_duration = Duration::from_secs(10);
309 0 : let mut ticker = tokio::time::interval(warn_duration);
310 :
311 0 : loop {
312 0 : tokio::select! {
313 : _ = ticker.tick() => {
314 : let last_pulled = stats.last_pulled_ts.load(std::sync::atomic::Ordering::SeqCst);
315 : if last_pulled == 0 {
316 : // no broker updates yet
317 : continue;
318 : }
319 :
320 : let now = BrokerStats::now_millis();
321 : if now > last_pulled && now - last_pulled > warn_duration.as_millis() as u64 {
322 : let ts = chrono::NaiveDateTime::from_timestamp_millis(last_pulled as i64).expect("invalid timestamp");
323 : info!("no broker updates for some time, last update: {:?}", ts);
324 : }
325 : }
326 0 : }
327 0 : }
328 : }
|