Line data Source code
1 : //! Communication with the broker, providing safekeeper peers and pageserver coordination.
2 :
3 : use std::sync::Arc;
4 : use std::sync::atomic::AtomicU64;
5 : use std::time::{Duration, Instant, UNIX_EPOCH};
6 :
7 : use anyhow::{Context, Error, Result, anyhow, bail};
8 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
9 : use storage_broker::proto::{
10 : FilterTenantTimelineId, MessageType, SafekeeperDiscoveryResponse, SubscribeByFilterRequest,
11 : SubscribeSafekeeperInfoRequest, TypeSubscription, TypedMessage,
12 : };
13 : use storage_broker::{Request, parse_proto_ttid};
14 : use tokio::task::JoinHandle;
15 : use tokio::time::sleep;
16 : use tracing::*;
17 :
18 : use crate::metrics::{
19 : BROKER_ITERATION_TIMELINES, BROKER_PULLED_UPDATES, BROKER_PUSH_ALL_UPDATES_SECONDS,
20 : BROKER_PUSHED_UPDATES,
21 : };
22 : use crate::{GlobalTimelines, SafeKeeperConf};
23 :
24 : const RETRY_INTERVAL_MSEC: u64 = 1000;
25 : const PUSH_INTERVAL_MSEC: u64 = 1000;
26 :
27 0 : fn make_tls_config(conf: &SafeKeeperConf) -> storage_broker::ClientTlsConfig {
28 0 : storage_broker::ClientTlsConfig::new().ca_certificates(
29 0 : conf.ssl_ca_certs
30 0 : .iter()
31 0 : .map(pem::encode)
32 0 : .map(storage_broker::Certificate::from_pem),
33 0 : )
34 0 : }
35 :
36 : /// Push once in a while data about all active timelines to the broker.
37 0 : async fn push_loop(
38 0 : conf: Arc<SafeKeeperConf>,
39 0 : global_timelines: Arc<GlobalTimelines>,
40 0 : ) -> anyhow::Result<()> {
41 0 : if conf.disable_periodic_broker_push {
42 0 : info!("broker push_loop is disabled, doing nothing...");
43 0 : futures::future::pending::<()>().await; // sleep forever
44 0 : return Ok(());
45 0 : }
46 0 :
47 0 : let active_timelines_set = global_timelines.get_global_broker_active_set();
48 :
49 0 : let mut client = storage_broker::connect(
50 0 : conf.broker_endpoint.clone(),
51 0 : conf.broker_keepalive_interval,
52 0 : make_tls_config(&conf),
53 0 : )?;
54 0 : let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
55 0 :
56 0 : let outbound = async_stream::stream! {
57 0 : loop {
58 0 : // Note: we lock runtime here and in timeline methods as GlobalTimelines
59 0 : // is under plain mutex. That's ok, all this code is not performance
60 0 : // sensitive and there is no risk of deadlock as we don't await while
61 0 : // lock is held.
62 0 : let now = Instant::now();
63 0 : let all_tlis = active_timelines_set.get_all();
64 0 : let mut n_pushed_tlis = 0;
65 0 : for tli in &all_tlis {
66 0 : let sk_info = tli.get_safekeeper_info(&conf).await;
67 0 : yield sk_info;
68 0 : BROKER_PUSHED_UPDATES.inc();
69 0 : n_pushed_tlis += 1;
70 0 : }
71 0 : let elapsed = now.elapsed();
72 0 :
73 0 : BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
74 0 : BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64);
75 0 :
76 0 : if elapsed > push_interval / 2 {
77 0 : info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed);
78 0 : }
79 0 :
80 0 : sleep(push_interval).await;
81 0 : }
82 0 : };
83 0 : client
84 0 : .publish_safekeeper_info(Request::new(outbound))
85 0 : .await?;
86 0 : Ok(())
87 0 : }
88 :
89 : /// Subscribe and fetch all the interesting data from the broker.
90 : #[instrument(name = "broker_pull", skip_all)]
91 : async fn pull_loop(
92 : conf: Arc<SafeKeeperConf>,
93 : global_timelines: Arc<GlobalTimelines>,
94 : stats: Arc<BrokerStats>,
95 : ) -> Result<()> {
96 : let mut client = storage_broker::connect(
97 : conf.broker_endpoint.clone(),
98 : conf.broker_keepalive_interval,
99 : make_tls_config(&conf),
100 : )?;
101 :
102 : // TODO: subscribe only to local timelines instead of all
103 : let request = SubscribeSafekeeperInfoRequest {
104 : subscription_key: Some(ProtoSubscriptionKey::All(())),
105 : };
106 :
107 : let mut stream = client
108 : .subscribe_safekeeper_info(request)
109 : .await
110 : .context("subscribe_safekeper_info request failed")?
111 : .into_inner();
112 :
113 : let ok_counter = BROKER_PULLED_UPDATES.with_label_values(&["ok"]);
114 : let not_found = BROKER_PULLED_UPDATES.with_label_values(&["not_found"]);
115 : let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]);
116 :
117 : while let Some(msg) = stream.message().await? {
118 : stats.update_pulled();
119 :
120 : let proto_ttid = msg
121 : .tenant_timeline_id
122 : .as_ref()
123 0 : .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
124 : let ttid = parse_proto_ttid(proto_ttid)?;
125 : if let Ok(tli) = global_timelines.get(ttid) {
126 : // Note that we also receive *our own* info. That's
127 : // important, as it is used as an indication of live
128 : // connection to the broker.
129 :
130 : // note: there are blocking operations below, but it's considered fine for now
131 : let res = tli.record_safekeeper_info(msg).await;
132 : if res.is_ok() {
133 : ok_counter.inc();
134 : } else {
135 : err_counter.inc();
136 : }
137 : res?;
138 : } else {
139 : not_found.inc();
140 : }
141 : }
142 : bail!("end of stream");
143 : }
144 :
145 : /// Process incoming discover requests. This is done in a separate task to avoid
146 : /// interfering with the normal pull/push loops.
147 0 : async fn discover_loop(
148 0 : conf: Arc<SafeKeeperConf>,
149 0 : global_timelines: Arc<GlobalTimelines>,
150 0 : stats: Arc<BrokerStats>,
151 0 : ) -> Result<()> {
152 0 : let mut client = storage_broker::connect(
153 0 : conf.broker_endpoint.clone(),
154 0 : conf.broker_keepalive_interval,
155 0 : make_tls_config(&conf),
156 0 : )?;
157 :
158 0 : let request = SubscribeByFilterRequest {
159 0 : types: vec![TypeSubscription {
160 0 : r#type: MessageType::SafekeeperDiscoveryRequest as i32,
161 0 : }],
162 0 : tenant_timeline_id: Some(FilterTenantTimelineId {
163 0 : enabled: false,
164 0 : tenant_timeline_id: None,
165 0 : }),
166 0 : };
167 :
168 0 : let mut stream = client
169 0 : .subscribe_by_filter(request)
170 0 : .await
171 0 : .context("subscribe_by_filter request failed")?
172 0 : .into_inner();
173 0 :
174 0 : let discover_counter = BROKER_PULLED_UPDATES.with_label_values(&["discover"]);
175 :
176 0 : while let Some(typed_msg) = stream.message().await? {
177 0 : stats.update_pulled();
178 0 :
179 0 : match typed_msg.r#type() {
180 : MessageType::SafekeeperDiscoveryRequest => {
181 0 : let msg = typed_msg
182 0 : .safekeeper_discovery_request
183 0 : .expect("proto type mismatch from broker message");
184 :
185 0 : let proto_ttid = msg
186 0 : .tenant_timeline_id
187 0 : .as_ref()
188 0 : .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
189 0 : let ttid = parse_proto_ttid(proto_ttid)?;
190 0 : if let Ok(tli) = global_timelines.get(ttid) {
191 : // we received a discovery request for a timeline we know about
192 0 : discover_counter.inc();
193 :
194 : // create and reply with discovery response
195 0 : let sk_info = tli.get_safekeeper_info(&conf).await;
196 0 : let response = SafekeeperDiscoveryResponse {
197 0 : safekeeper_id: sk_info.safekeeper_id,
198 0 : tenant_timeline_id: sk_info.tenant_timeline_id,
199 0 : commit_lsn: sk_info.commit_lsn,
200 0 : safekeeper_connstr: sk_info.safekeeper_connstr,
201 0 : availability_zone: sk_info.availability_zone,
202 0 : standby_horizon: 0,
203 0 : };
204 0 :
205 0 : // note this is a blocking call
206 0 : client
207 0 : .publish_one(TypedMessage {
208 0 : r#type: MessageType::SafekeeperDiscoveryResponse as i32,
209 0 : safekeeper_timeline_info: None,
210 0 : safekeeper_discovery_request: None,
211 0 : safekeeper_discovery_response: Some(response),
212 0 : })
213 0 : .await?;
214 0 : }
215 : }
216 :
217 : _ => {
218 0 : warn!(
219 0 : "unexpected message type i32 {}, {:?}",
220 0 : typed_msg.r#type,
221 0 : typed_msg.r#type()
222 : );
223 : }
224 : }
225 : }
226 0 : bail!("end of stream");
227 0 : }
228 :
229 0 : pub async fn task_main(
230 0 : conf: Arc<SafeKeeperConf>,
231 0 : global_timelines: Arc<GlobalTimelines>,
232 0 : ) -> anyhow::Result<()> {
233 0 : info!("started, broker endpoint {:?}", conf.broker_endpoint);
234 :
235 0 : let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
236 0 : let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
237 0 : let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
238 0 : let mut discover_handle: Option<JoinHandle<Result<(), Error>>> = None;
239 0 :
240 0 : let stats = Arc::new(BrokerStats::new());
241 0 : let stats_task = task_stats(stats.clone());
242 0 : tokio::pin!(stats_task);
243 :
244 : // Selecting on JoinHandles requires some squats; is there a better way to
245 : // reap tasks individually?
246 :
247 : // Handling failures in task itself won't catch panic and in Tokio, task's
248 : // panic doesn't kill the whole executor, so it is better to do reaping
249 : // here.
250 : loop {
251 0 : tokio::select! {
252 0 : res = async { push_handle.as_mut().unwrap().await }, if push_handle.is_some() => {
253 : // was it panic or normal error?
254 0 : let err = match res {
255 0 : Ok(res_internal) => res_internal.unwrap_err(),
256 0 : Err(err_outer) => err_outer.into(),
257 : };
258 0 : warn!("push task failed: {:?}", err);
259 0 : push_handle = None;
260 : },
261 0 : res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
262 : // was it panic or normal error?
263 0 : match res {
264 0 : Ok(res_internal) => if let Err(err_inner) = res_internal {
265 0 : warn!("pull task failed: {:?}", err_inner);
266 0 : }
267 0 : Err(err_outer) => { warn!("pull task panicked: {:?}", err_outer) }
268 : };
269 0 : pull_handle = None;
270 : },
271 0 : res = async { discover_handle.as_mut().unwrap().await }, if discover_handle.is_some() => {
272 : // was it panic or normal error?
273 0 : match res {
274 0 : Ok(res_internal) => if let Err(err_inner) = res_internal {
275 0 : warn!("discover task failed: {:?}", err_inner);
276 0 : }
277 0 : Err(err_outer) => { warn!("discover task panicked: {:?}", err_outer) }
278 : };
279 0 : discover_handle = None;
280 : },
281 0 : _ = ticker.tick() => {
282 0 : if push_handle.is_none() {
283 0 : push_handle = Some(tokio::spawn(push_loop(conf.clone(), global_timelines.clone())));
284 0 : }
285 0 : if pull_handle.is_none() {
286 0 : pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), global_timelines.clone(), stats.clone())));
287 0 : }
288 0 : if discover_handle.is_none() {
289 0 : discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), global_timelines.clone(), stats.clone())));
290 0 : }
291 : },
292 0 : _ = &mut stats_task => {}
293 : }
294 : }
295 : }
296 :
297 : struct BrokerStats {
298 : /// Timestamp of the last received message from the broker.
299 : last_pulled_ts: AtomicU64,
300 : }
301 :
302 : impl BrokerStats {
303 0 : fn new() -> Self {
304 0 : BrokerStats {
305 0 : last_pulled_ts: AtomicU64::new(0),
306 0 : }
307 0 : }
308 :
309 0 : fn now_millis() -> u64 {
310 0 : std::time::SystemTime::now()
311 0 : .duration_since(UNIX_EPOCH)
312 0 : .expect("time is before epoch")
313 0 : .as_millis() as u64
314 0 : }
315 :
316 : /// Update last_pulled timestamp to current time.
317 0 : fn update_pulled(&self) {
318 0 : self.last_pulled_ts
319 0 : .store(Self::now_millis(), std::sync::atomic::Ordering::Relaxed);
320 0 : }
321 : }
322 :
323 : /// Periodically write to logs if there are issues with receiving data from the broker.
324 0 : async fn task_stats(stats: Arc<BrokerStats>) {
325 0 : let warn_duration = Duration::from_secs(10);
326 0 : let mut ticker = tokio::time::interval(warn_duration);
327 :
328 : loop {
329 0 : tokio::select! {
330 0 : _ = ticker.tick() => {
331 0 : let last_pulled = stats.last_pulled_ts.load(std::sync::atomic::Ordering::SeqCst);
332 0 : if last_pulled == 0 {
333 : // no broker updates yet
334 0 : continue;
335 0 : }
336 0 :
337 0 : let now = BrokerStats::now_millis();
338 0 : if now > last_pulled && now - last_pulled > warn_duration.as_millis() as u64 {
339 0 : let ts = chrono::DateTime::from_timestamp_millis(last_pulled as i64).expect("invalid timestamp");
340 0 : info!("no broker updates for some time, last update: {:?}", ts);
341 0 : }
342 : }
343 : }
344 : }
345 : }
|