Line data Source code
1 : use std::collections::HashMap;
2 : use std::fmt::Debug;
3 : use std::future::Future;
4 : use std::sync::Arc;
5 : use std::time::{Duration, Instant};
6 :
7 : use futures::StreamExt;
8 : use futures::stream::FuturesUnordered;
9 : use pageserver_api::controller_api::{NodeAvailability, SkSchedulingPolicy};
10 : use pageserver_api::models::PageserverUtilization;
11 : use safekeeper_api::models::SafekeeperUtilization;
12 : use safekeeper_client::mgmt_api;
13 : use thiserror::Error;
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::Instrument;
16 : use utils::id::NodeId;
17 : use utils::logging::SecretString;
18 :
19 : use crate::node::Node;
20 : use crate::safekeeper::Safekeeper;
21 :
22 : struct HeartbeaterTask<Server, State> {
23 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
24 : cancel: CancellationToken,
25 :
26 : state: HashMap<NodeId, State>,
27 :
28 : max_offline_interval: Duration,
29 : max_warming_up_interval: Duration,
30 : http_client: reqwest::Client,
31 : jwt_token: Option<String>,
32 : }
33 :
34 : #[derive(Debug, Clone)]
35 : pub(crate) enum PageserverState {
36 : Available {
37 : last_seen_at: Instant,
38 : utilization: PageserverUtilization,
39 : },
40 : WarmingUp {
41 : started_at: Instant,
42 : },
43 : Offline,
44 : }
45 :
46 : #[derive(Debug, Clone)]
47 : pub(crate) enum SafekeeperState {
48 : Available {
49 : last_seen_at: Instant,
50 : utilization: SafekeeperUtilization,
51 : },
52 : Offline,
53 : }
54 :
55 : #[derive(Debug)]
56 : pub(crate) struct AvailablityDeltas<State>(pub Vec<(NodeId, State)>);
57 :
58 : #[derive(Debug, Error)]
59 : pub(crate) enum HeartbeaterError {
60 : #[error("Cancelled")]
61 : Cancel,
62 : }
63 :
64 : struct HeartbeatRequest<Server, State> {
65 : servers: Arc<HashMap<NodeId, Server>>,
66 : reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas<State>, HeartbeaterError>>,
67 : }
68 :
69 : pub(crate) struct Heartbeater<Server, State> {
70 : sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest<Server, State>>,
71 : }
72 :
73 : #[allow(private_bounds)]
74 : impl<Server: Send + Sync + 'static, State: Debug + Send + 'static> Heartbeater<Server, State>
75 : where
76 : HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
77 : {
78 0 : pub(crate) fn new(
79 0 : http_client: reqwest::Client,
80 0 : jwt_token: Option<String>,
81 0 : max_offline_interval: Duration,
82 0 : max_warming_up_interval: Duration,
83 0 : cancel: CancellationToken,
84 0 : ) -> Self {
85 0 : let (sender, receiver) =
86 0 : tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
87 0 : let mut heartbeater = HeartbeaterTask::new(
88 0 : receiver,
89 0 : http_client,
90 0 : jwt_token,
91 0 : max_offline_interval,
92 0 : max_warming_up_interval,
93 0 : cancel,
94 0 : );
95 0 : tokio::task::spawn(async move { heartbeater.run().await });
96 0 :
97 0 : Self { sender }
98 0 : }
99 :
100 0 : pub(crate) async fn heartbeat(
101 0 : &self,
102 0 : servers: Arc<HashMap<NodeId, Server>>,
103 0 : ) -> Result<AvailablityDeltas<State>, HeartbeaterError> {
104 0 : let (sender, receiver) = tokio::sync::oneshot::channel();
105 0 : self.sender
106 0 : .send(HeartbeatRequest {
107 0 : servers,
108 0 : reply: sender,
109 0 : })
110 0 : .map_err(|_| HeartbeaterError::Cancel)?;
111 :
112 0 : receiver
113 0 : .await
114 0 : .map_err(|_| HeartbeaterError::Cancel)
115 0 : .and_then(|x| x)
116 0 : }
117 : }
118 :
119 : impl<Server, State: Debug> HeartbeaterTask<Server, State>
120 : where
121 : HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
122 : {
123 0 : fn new(
124 0 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
125 0 : http_client: reqwest::Client,
126 0 : jwt_token: Option<String>,
127 0 : max_offline_interval: Duration,
128 0 : max_warming_up_interval: Duration,
129 0 : cancel: CancellationToken,
130 0 : ) -> Self {
131 0 : Self {
132 0 : receiver,
133 0 : cancel,
134 0 : state: HashMap::new(),
135 0 : max_offline_interval,
136 0 : max_warming_up_interval,
137 0 : http_client,
138 0 : jwt_token,
139 0 : }
140 0 : }
141 0 : async fn run(&mut self) {
142 : loop {
143 0 : tokio::select! {
144 0 : request = self.receiver.recv() => {
145 0 : match request {
146 0 : Some(req) => {
147 0 : if req.reply.is_closed() {
148 : // Prevent a possibly infinite buildup of the receiver channel, if requests arrive faster than we can handle them
149 0 : continue;
150 0 : }
151 0 : let res = self.heartbeat(req.servers).await;
152 : // Ignore the return value in order to not panic if the heartbeat function's future was cancelled
153 0 : _ = req.reply.send(res);
154 : },
155 0 : None => { return; }
156 : }
157 : },
158 0 : _ = self.cancel.cancelled() => return
159 : }
160 : }
161 0 : }
162 : }
163 :
164 : pub(crate) trait HeartBeat<Server, State> {
165 : fn heartbeat(
166 : &mut self,
167 : pageservers: Arc<HashMap<NodeId, Server>>,
168 : ) -> impl Future<Output = Result<AvailablityDeltas<State>, HeartbeaterError>> + Send;
169 : }
170 :
171 : impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState> {
172 0 : async fn heartbeat(
173 0 : &mut self,
174 0 : pageservers: Arc<HashMap<NodeId, Node>>,
175 0 : ) -> Result<AvailablityDeltas<PageserverState>, HeartbeaterError> {
176 0 : let mut new_state = HashMap::new();
177 0 :
178 0 : let mut heartbeat_futs = FuturesUnordered::new();
179 0 : for (node_id, node) in &*pageservers {
180 0 : heartbeat_futs.push({
181 0 : let http_client = self.http_client.clone();
182 0 : let jwt_token = self.jwt_token.clone();
183 0 : let cancel = self.cancel.clone();
184 0 :
185 0 : // Clone the node and mark it as available such that the request
186 0 : // goes through to the pageserver even when the node is marked offline.
187 0 : // This doesn't impact the availability observed by [`crate::service::Service`].
188 0 : let mut node_clone = node.clone();
189 0 : node_clone
190 0 : .set_availability(NodeAvailability::Active(PageserverUtilization::full()));
191 :
192 0 : async move {
193 0 : let response = node_clone
194 0 : .with_client_retries(
195 0 : |client| async move { client.get_utilization().await },
196 0 : &http_client,
197 0 : &jwt_token,
198 0 : 3,
199 0 : 3,
200 0 : Duration::from_secs(1),
201 0 : &cancel,
202 0 : )
203 0 : .await;
204 :
205 0 : let response = match response {
206 0 : Some(r) => r,
207 : None => {
208 : // This indicates cancellation of the request.
209 : // We ignore the node in this case.
210 0 : return None;
211 : }
212 : };
213 :
214 0 : let status = if let Ok(utilization) = response {
215 0 : PageserverState::Available {
216 0 : last_seen_at: Instant::now(),
217 0 : utilization,
218 0 : }
219 0 : } else if let NodeAvailability::WarmingUp(last_seen_at) =
220 0 : node.get_availability()
221 : {
222 0 : PageserverState::WarmingUp {
223 0 : started_at: *last_seen_at,
224 0 : }
225 : } else {
226 0 : PageserverState::Offline
227 : };
228 :
229 0 : Some((*node_id, status))
230 0 : }
231 0 : .instrument(tracing::info_span!("heartbeat_ps", %node_id))
232 : });
233 : }
234 :
235 : loop {
236 0 : let maybe_status = tokio::select! {
237 0 : next = heartbeat_futs.next() => {
238 0 : match next {
239 0 : Some(result) => result,
240 0 : None => { break; }
241 : }
242 : },
243 0 : _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
244 : };
245 :
246 0 : if let Some((node_id, status)) = maybe_status {
247 0 : new_state.insert(node_id, status);
248 0 : }
249 : }
250 :
251 0 : let mut warming_up = 0;
252 0 : let mut offline = 0;
253 0 : for state in new_state.values() {
254 0 : match state {
255 0 : PageserverState::WarmingUp { .. } => {
256 0 : warming_up += 1;
257 0 : }
258 0 : PageserverState::Offline => offline += 1,
259 0 : PageserverState::Available { .. } => {}
260 : }
261 : }
262 :
263 0 : tracing::info!(
264 0 : "Heartbeat round complete for {} nodes, {} warming-up, {} offline",
265 0 : new_state.len(),
266 : warming_up,
267 : offline
268 : );
269 :
270 0 : let mut deltas = Vec::new();
271 0 : let now = Instant::now();
272 0 : for (node_id, ps_state) in new_state.iter_mut() {
273 : use std::collections::hash_map::Entry::*;
274 0 : let entry = self.state.entry(*node_id);
275 0 :
276 0 : let mut needs_update = false;
277 0 : match entry {
278 0 : Occupied(ref occ) => match (occ.get(), &ps_state) {
279 0 : (PageserverState::Offline, PageserverState::Offline) => {}
280 0 : (PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => {
281 0 : if now - *last_seen_at >= self.max_offline_interval {
282 0 : deltas.push((*node_id, ps_state.clone()));
283 0 : needs_update = true;
284 0 : }
285 : }
286 0 : (_, PageserverState::WarmingUp { started_at }) => {
287 0 : if now - *started_at >= self.max_warming_up_interval {
288 0 : *ps_state = PageserverState::Offline;
289 0 : }
290 :
291 0 : deltas.push((*node_id, ps_state.clone()));
292 0 : needs_update = true;
293 : }
294 0 : _ => {
295 0 : deltas.push((*node_id, ps_state.clone()));
296 0 : needs_update = true;
297 0 : }
298 : },
299 0 : Vacant(_) => {
300 0 : // This is a new node. Don't generate a delta for it.
301 0 : deltas.push((*node_id, ps_state.clone()));
302 0 : }
303 : }
304 :
305 0 : match entry {
306 0 : Occupied(mut occ) if needs_update => {
307 0 : (*occ.get_mut()) = ps_state.clone();
308 0 : }
309 0 : Vacant(vac) => {
310 0 : vac.insert(ps_state.clone());
311 0 : }
312 0 : _ => {}
313 : }
314 : }
315 :
316 0 : Ok(AvailablityDeltas(deltas))
317 0 : }
318 : }
319 :
320 : impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, SafekeeperState> {
321 0 : async fn heartbeat(
322 0 : &mut self,
323 0 : safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
324 0 : ) -> Result<AvailablityDeltas<SafekeeperState>, HeartbeaterError> {
325 0 : let mut new_state = HashMap::new();
326 0 :
327 0 : let mut heartbeat_futs = FuturesUnordered::new();
328 0 : for (node_id, sk) in &*safekeepers {
329 0 : if sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned {
330 0 : continue;
331 0 : }
332 0 : heartbeat_futs.push({
333 0 : let http_client = self.http_client.clone();
334 0 : let jwt_token = self
335 0 : .jwt_token
336 0 : .as_ref()
337 0 : .map(|t| SecretString::from(t.to_owned()));
338 0 : let cancel = self.cancel.clone();
339 :
340 0 : async move {
341 0 : let response = sk
342 0 : .with_client_retries(
343 0 : |client| async move { client.get_utilization().await },
344 0 : &http_client,
345 0 : &jwt_token,
346 0 : 3,
347 0 : 3,
348 0 : Duration::from_secs(1),
349 0 : &cancel,
350 0 : )
351 0 : .await;
352 :
353 0 : let status = match response {
354 0 : Ok(utilization) => SafekeeperState::Available {
355 0 : last_seen_at: Instant::now(),
356 0 : utilization,
357 0 : },
358 : Err(mgmt_api::Error::Cancelled) => {
359 : // This indicates cancellation of the request.
360 : // We ignore the node in this case.
361 0 : return None;
362 : }
363 0 : Err(e) => {
364 0 : tracing::info!(
365 0 : "Marking safekeeper {} at as offline: {e}",
366 0 : sk.base_url()
367 : );
368 0 : SafekeeperState::Offline
369 : }
370 : };
371 :
372 0 : Some((*node_id, status))
373 0 : }
374 0 : .instrument(tracing::info_span!("heartbeat_sk", %node_id))
375 : });
376 : }
377 :
378 : loop {
379 0 : let maybe_status = tokio::select! {
380 0 : next = heartbeat_futs.next() => {
381 0 : match next {
382 0 : Some(result) => result,
383 0 : None => { break; }
384 : }
385 : },
386 0 : _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
387 : };
388 :
389 0 : if let Some((node_id, status)) = maybe_status {
390 0 : new_state.insert(node_id, status);
391 0 : }
392 : }
393 :
394 0 : let mut offline = 0;
395 0 : for state in new_state.values() {
396 0 : match state {
397 0 : SafekeeperState::Offline => offline += 1,
398 0 : SafekeeperState::Available { .. } => {}
399 : }
400 : }
401 :
402 0 : tracing::info!(
403 0 : "Heartbeat round complete for {} safekeepers, {} offline",
404 0 : new_state.len(),
405 : offline
406 : );
407 :
408 0 : let mut deltas = Vec::new();
409 0 : let now = Instant::now();
410 0 : for (node_id, sk_state) in new_state.iter_mut() {
411 : use std::collections::hash_map::Entry::*;
412 0 : let entry = self.state.entry(*node_id);
413 0 :
414 0 : let mut needs_update = false;
415 0 : match entry {
416 0 : Occupied(ref occ) => match (occ.get(), &sk_state) {
417 0 : (SafekeeperState::Offline, SafekeeperState::Offline) => {}
418 0 : (SafekeeperState::Available { last_seen_at, .. }, SafekeeperState::Offline) => {
419 0 : if now - *last_seen_at >= self.max_offline_interval {
420 0 : deltas.push((*node_id, sk_state.clone()));
421 0 : needs_update = true;
422 0 : }
423 : }
424 0 : _ => {
425 0 : deltas.push((*node_id, sk_state.clone()));
426 0 : needs_update = true;
427 0 : }
428 : },
429 0 : Vacant(_) => {
430 0 : // This is a new node. Don't generate a delta for it.
431 0 : deltas.push((*node_id, sk_state.clone()));
432 0 : }
433 : }
434 :
435 0 : match entry {
436 0 : Occupied(mut occ) if needs_update => {
437 0 : (*occ.get_mut()) = sk_state.clone();
438 0 : }
439 0 : Vacant(vac) => {
440 0 : vac.insert(sk_state.clone());
441 0 : }
442 0 : _ => {}
443 : }
444 : }
445 :
446 0 : Ok(AvailablityDeltas(deltas))
447 0 : }
448 : }
|