Line data Source code
1 : //! Communication with the broker, providing safekeeper peers and pageserver coordination.
2 :
3 : use std::sync::Arc;
4 : use std::sync::atomic::AtomicU64;
5 : use std::time::{Duration, Instant, UNIX_EPOCH};
6 :
7 : use anyhow::{Context, Error, Result, anyhow, bail};
8 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
9 : use storage_broker::proto::{
10 : FilterTenantTimelineId, MessageType, SafekeeperDiscoveryResponse, SubscribeByFilterRequest,
11 : SubscribeSafekeeperInfoRequest, TypeSubscription, TypedMessage,
12 : };
13 : use storage_broker::{Request, parse_proto_ttid};
14 : use tokio::task::JoinHandle;
15 : use tokio::time::sleep;
16 : use tracing::*;
17 :
18 : use crate::metrics::{
19 : BROKER_ITERATION_TIMELINES, BROKER_PULLED_UPDATES, BROKER_PUSH_ALL_UPDATES_SECONDS,
20 : BROKER_PUSHED_UPDATES,
21 : };
22 : use crate::{GlobalTimelines, SafeKeeperConf};
23 :
24 : const RETRY_INTERVAL_MSEC: u64 = 1000;
25 : const PUSH_INTERVAL_MSEC: u64 = 1000;
26 :
27 : /// Push once in a while data about all active timelines to the broker.
28 0 : async fn push_loop(
29 0 : conf: Arc<SafeKeeperConf>,
30 0 : global_timelines: Arc<GlobalTimelines>,
31 0 : ) -> anyhow::Result<()> {
32 0 : if conf.disable_periodic_broker_push {
33 0 : info!("broker push_loop is disabled, doing nothing...");
34 0 : futures::future::pending::<()>().await; // sleep forever
35 0 : return Ok(());
36 0 : }
37 0 :
38 0 : let active_timelines_set = global_timelines.get_global_broker_active_set();
39 :
40 0 : let mut client =
41 0 : storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
42 0 : let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
43 0 :
44 0 : let outbound = async_stream::stream! {
45 0 : loop {
46 0 : // Note: we lock runtime here and in timeline methods as GlobalTimelines
47 0 : // is under plain mutex. That's ok, all this code is not performance
48 0 : // sensitive and there is no risk of deadlock as we don't await while
49 0 : // lock is held.
50 0 : let now = Instant::now();
51 0 : let all_tlis = active_timelines_set.get_all();
52 0 : let mut n_pushed_tlis = 0;
53 0 : for tli in &all_tlis {
54 0 : let sk_info = tli.get_safekeeper_info(&conf).await;
55 0 : yield sk_info;
56 0 : BROKER_PUSHED_UPDATES.inc();
57 0 : n_pushed_tlis += 1;
58 0 : }
59 0 : let elapsed = now.elapsed();
60 0 :
61 0 : BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
62 0 : BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64);
63 0 :
64 0 : if elapsed > push_interval / 2 {
65 0 : info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed);
66 0 : }
67 0 :
68 0 : sleep(push_interval).await;
69 0 : }
70 0 : };
71 0 : client
72 0 : .publish_safekeeper_info(Request::new(outbound))
73 0 : .await?;
74 0 : Ok(())
75 0 : }
76 :
77 : /// Subscribe and fetch all the interesting data from the broker.
78 : #[instrument(name = "broker_pull", skip_all)]
79 : async fn pull_loop(
80 : conf: Arc<SafeKeeperConf>,
81 : global_timelines: Arc<GlobalTimelines>,
82 : stats: Arc<BrokerStats>,
83 : ) -> Result<()> {
84 : let mut client =
85 : storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
86 :
87 : // TODO: subscribe only to local timelines instead of all
88 : let request = SubscribeSafekeeperInfoRequest {
89 : subscription_key: Some(ProtoSubscriptionKey::All(())),
90 : };
91 :
92 : let mut stream = client
93 : .subscribe_safekeeper_info(request)
94 : .await
95 : .context("subscribe_safekeper_info request failed")?
96 : .into_inner();
97 :
98 : let ok_counter = BROKER_PULLED_UPDATES.with_label_values(&["ok"]);
99 : let not_found = BROKER_PULLED_UPDATES.with_label_values(&["not_found"]);
100 : let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]);
101 :
102 : while let Some(msg) = stream.message().await? {
103 : stats.update_pulled();
104 :
105 : let proto_ttid = msg
106 : .tenant_timeline_id
107 : .as_ref()
108 0 : .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
109 : let ttid = parse_proto_ttid(proto_ttid)?;
110 : if let Ok(tli) = global_timelines.get(ttid) {
111 : // Note that we also receive *our own* info. That's
112 : // important, as it is used as an indication of live
113 : // connection to the broker.
114 :
115 : // note: there are blocking operations below, but it's considered fine for now
116 : let res = tli.record_safekeeper_info(msg).await;
117 : if res.is_ok() {
118 : ok_counter.inc();
119 : } else {
120 : err_counter.inc();
121 : }
122 : res?;
123 : } else {
124 : not_found.inc();
125 : }
126 : }
127 : bail!("end of stream");
128 : }
129 :
130 : /// Process incoming discover requests. This is done in a separate task to avoid
131 : /// interfering with the normal pull/push loops.
132 0 : async fn discover_loop(
133 0 : conf: Arc<SafeKeeperConf>,
134 0 : global_timelines: Arc<GlobalTimelines>,
135 0 : stats: Arc<BrokerStats>,
136 0 : ) -> Result<()> {
137 0 : let mut client =
138 0 : storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
139 :
140 0 : let request = SubscribeByFilterRequest {
141 0 : types: vec![TypeSubscription {
142 0 : r#type: MessageType::SafekeeperDiscoveryRequest as i32,
143 0 : }],
144 0 : tenant_timeline_id: Some(FilterTenantTimelineId {
145 0 : enabled: false,
146 0 : tenant_timeline_id: None,
147 0 : }),
148 0 : };
149 :
150 0 : let mut stream = client
151 0 : .subscribe_by_filter(request)
152 0 : .await
153 0 : .context("subscribe_by_filter request failed")?
154 0 : .into_inner();
155 0 :
156 0 : let discover_counter = BROKER_PULLED_UPDATES.with_label_values(&["discover"]);
157 :
158 0 : while let Some(typed_msg) = stream.message().await? {
159 0 : stats.update_pulled();
160 0 :
161 0 : match typed_msg.r#type() {
162 : MessageType::SafekeeperDiscoveryRequest => {
163 0 : let msg = typed_msg
164 0 : .safekeeper_discovery_request
165 0 : .expect("proto type mismatch from broker message");
166 :
167 0 : let proto_ttid = msg
168 0 : .tenant_timeline_id
169 0 : .as_ref()
170 0 : .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
171 0 : let ttid = parse_proto_ttid(proto_ttid)?;
172 0 : if let Ok(tli) = global_timelines.get(ttid) {
173 : // we received a discovery request for a timeline we know about
174 0 : discover_counter.inc();
175 :
176 : // create and reply with discovery response
177 0 : let sk_info = tli.get_safekeeper_info(&conf).await;
178 0 : let response = SafekeeperDiscoveryResponse {
179 0 : safekeeper_id: sk_info.safekeeper_id,
180 0 : tenant_timeline_id: sk_info.tenant_timeline_id,
181 0 : commit_lsn: sk_info.commit_lsn,
182 0 : safekeeper_connstr: sk_info.safekeeper_connstr,
183 0 : availability_zone: sk_info.availability_zone,
184 0 : standby_horizon: 0,
185 0 : };
186 0 :
187 0 : // note this is a blocking call
188 0 : client
189 0 : .publish_one(TypedMessage {
190 0 : r#type: MessageType::SafekeeperDiscoveryResponse as i32,
191 0 : safekeeper_timeline_info: None,
192 0 : safekeeper_discovery_request: None,
193 0 : safekeeper_discovery_response: Some(response),
194 0 : })
195 0 : .await?;
196 0 : }
197 : }
198 :
199 : _ => {
200 0 : warn!(
201 0 : "unexpected message type i32 {}, {:?}",
202 0 : typed_msg.r#type,
203 0 : typed_msg.r#type()
204 : );
205 : }
206 : }
207 : }
208 0 : bail!("end of stream");
209 0 : }
210 :
211 0 : pub async fn task_main(
212 0 : conf: Arc<SafeKeeperConf>,
213 0 : global_timelines: Arc<GlobalTimelines>,
214 0 : ) -> anyhow::Result<()> {
215 0 : info!("started, broker endpoint {:?}", conf.broker_endpoint);
216 :
217 0 : let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
218 0 : let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
219 0 : let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
220 0 : let mut discover_handle: Option<JoinHandle<Result<(), Error>>> = None;
221 0 :
222 0 : let stats = Arc::new(BrokerStats::new());
223 0 : let stats_task = task_stats(stats.clone());
224 0 : tokio::pin!(stats_task);
225 :
226 : // Selecting on JoinHandles requires some squats; is there a better way to
227 : // reap tasks individually?
228 :
229 : // Handling failures in task itself won't catch panic and in Tokio, task's
230 : // panic doesn't kill the whole executor, so it is better to do reaping
231 : // here.
232 : loop {
233 0 : tokio::select! {
234 0 : res = async { push_handle.as_mut().unwrap().await }, if push_handle.is_some() => {
235 : // was it panic or normal error?
236 0 : let err = match res {
237 0 : Ok(res_internal) => res_internal.unwrap_err(),
238 0 : Err(err_outer) => err_outer.into(),
239 : };
240 0 : warn!("push task failed: {:?}", err);
241 0 : push_handle = None;
242 : },
243 0 : res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
244 : // was it panic or normal error?
245 0 : match res {
246 0 : Ok(res_internal) => if let Err(err_inner) = res_internal {
247 0 : warn!("pull task failed: {:?}", err_inner);
248 0 : }
249 0 : Err(err_outer) => { warn!("pull task panicked: {:?}", err_outer) }
250 : };
251 0 : pull_handle = None;
252 : },
253 0 : res = async { discover_handle.as_mut().unwrap().await }, if discover_handle.is_some() => {
254 : // was it panic or normal error?
255 0 : match res {
256 0 : Ok(res_internal) => if let Err(err_inner) = res_internal {
257 0 : warn!("discover task failed: {:?}", err_inner);
258 0 : }
259 0 : Err(err_outer) => { warn!("discover task panicked: {:?}", err_outer) }
260 : };
261 0 : discover_handle = None;
262 : },
263 0 : _ = ticker.tick() => {
264 0 : if push_handle.is_none() {
265 0 : push_handle = Some(tokio::spawn(push_loop(conf.clone(), global_timelines.clone())));
266 0 : }
267 0 : if pull_handle.is_none() {
268 0 : pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), global_timelines.clone(), stats.clone())));
269 0 : }
270 0 : if discover_handle.is_none() {
271 0 : discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), global_timelines.clone(), stats.clone())));
272 0 : }
273 : },
274 0 : _ = &mut stats_task => {}
275 : }
276 : }
277 : }
278 :
279 : struct BrokerStats {
280 : /// Timestamp of the last received message from the broker.
281 : last_pulled_ts: AtomicU64,
282 : }
283 :
284 : impl BrokerStats {
285 0 : fn new() -> Self {
286 0 : BrokerStats {
287 0 : last_pulled_ts: AtomicU64::new(0),
288 0 : }
289 0 : }
290 :
291 0 : fn now_millis() -> u64 {
292 0 : std::time::SystemTime::now()
293 0 : .duration_since(UNIX_EPOCH)
294 0 : .expect("time is before epoch")
295 0 : .as_millis() as u64
296 0 : }
297 :
298 : /// Update last_pulled timestamp to current time.
299 0 : fn update_pulled(&self) {
300 0 : self.last_pulled_ts
301 0 : .store(Self::now_millis(), std::sync::atomic::Ordering::Relaxed);
302 0 : }
303 : }
304 :
305 : /// Periodically write to logs if there are issues with receiving data from the broker.
306 0 : async fn task_stats(stats: Arc<BrokerStats>) {
307 0 : let warn_duration = Duration::from_secs(10);
308 0 : let mut ticker = tokio::time::interval(warn_duration);
309 :
310 : loop {
311 0 : tokio::select! {
312 0 : _ = ticker.tick() => {
313 0 : let last_pulled = stats.last_pulled_ts.load(std::sync::atomic::Ordering::SeqCst);
314 0 : if last_pulled == 0 {
315 : // no broker updates yet
316 0 : continue;
317 0 : }
318 0 :
319 0 : let now = BrokerStats::now_millis();
320 0 : if now > last_pulled && now - last_pulled > warn_duration.as_millis() as u64 {
321 0 : let ts = chrono::DateTime::from_timestamp_millis(last_pulled as i64).expect("invalid timestamp");
322 0 : info!("no broker updates for some time, last update: {:?}", ts);
323 0 : }
324 : }
325 : }
326 : }
327 : }
|