Line data Source code
1 : use std::collections::HashMap;
2 : use std::fmt::Debug;
3 : use std::future::Future;
4 : use std::sync::Arc;
5 : use std::time::{Duration, Instant};
6 :
7 : use futures::StreamExt;
8 : use futures::stream::FuturesUnordered;
9 : use pageserver_api::controller_api::{NodeAvailability, SkSchedulingPolicy};
10 : use pageserver_api::models::PageserverUtilization;
11 : use safekeeper_api::models::SafekeeperUtilization;
12 : use safekeeper_client::mgmt_api;
13 : use thiserror::Error;
14 : use tokio_util::sync::CancellationToken;
15 : use utils::id::NodeId;
16 : use utils::logging::SecretString;
17 :
18 : use crate::node::Node;
19 : use crate::safekeeper::Safekeeper;
20 :
21 : struct HeartbeaterTask<Server, State> {
22 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
23 : cancel: CancellationToken,
24 :
25 : state: HashMap<NodeId, State>,
26 :
27 : max_offline_interval: Duration,
28 : max_warming_up_interval: Duration,
29 : http_client: reqwest::Client,
30 : jwt_token: Option<String>,
31 : }
32 :
33 : #[derive(Debug, Clone)]
34 : pub(crate) enum PageserverState {
35 : Available {
36 : last_seen_at: Instant,
37 : utilization: PageserverUtilization,
38 : },
39 : WarmingUp {
40 : started_at: Instant,
41 : },
42 : Offline,
43 : }
44 :
45 : #[derive(Debug, Clone)]
46 : pub(crate) enum SafekeeperState {
47 : Available {
48 : last_seen_at: Instant,
49 : utilization: SafekeeperUtilization,
50 : },
51 : Offline,
52 : }
53 :
54 : #[derive(Debug)]
55 : pub(crate) struct AvailablityDeltas<State>(pub Vec<(NodeId, State)>);
56 :
57 : #[derive(Debug, Error)]
58 : pub(crate) enum HeartbeaterError {
59 : #[error("Cancelled")]
60 : Cancel,
61 : }
62 :
63 : struct HeartbeatRequest<Server, State> {
64 : servers: Arc<HashMap<NodeId, Server>>,
65 : reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas<State>, HeartbeaterError>>,
66 : }
67 :
68 : pub(crate) struct Heartbeater<Server, State> {
69 : sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest<Server, State>>,
70 : }
71 :
72 : #[allow(private_bounds)]
73 : impl<Server: Send + Sync + 'static, State: Debug + Send + 'static> Heartbeater<Server, State>
74 : where
75 : HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
76 : {
77 0 : pub(crate) fn new(
78 0 : http_client: reqwest::Client,
79 0 : jwt_token: Option<String>,
80 0 : max_offline_interval: Duration,
81 0 : max_warming_up_interval: Duration,
82 0 : cancel: CancellationToken,
83 0 : ) -> Self {
84 0 : let (sender, receiver) =
85 0 : tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
86 0 : let mut heartbeater = HeartbeaterTask::new(
87 0 : receiver,
88 0 : http_client,
89 0 : jwt_token,
90 0 : max_offline_interval,
91 0 : max_warming_up_interval,
92 0 : cancel,
93 0 : );
94 0 : tokio::task::spawn(async move { heartbeater.run().await });
95 0 :
96 0 : Self { sender }
97 0 : }
98 :
99 0 : pub(crate) async fn heartbeat(
100 0 : &self,
101 0 : servers: Arc<HashMap<NodeId, Server>>,
102 0 : ) -> Result<AvailablityDeltas<State>, HeartbeaterError> {
103 0 : let (sender, receiver) = tokio::sync::oneshot::channel();
104 0 : self.sender
105 0 : .send(HeartbeatRequest {
106 0 : servers,
107 0 : reply: sender,
108 0 : })
109 0 : .map_err(|_| HeartbeaterError::Cancel)?;
110 :
111 0 : receiver
112 0 : .await
113 0 : .map_err(|_| HeartbeaterError::Cancel)
114 0 : .and_then(|x| x)
115 0 : }
116 : }
117 :
118 : impl<Server, State: Debug> HeartbeaterTask<Server, State>
119 : where
120 : HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
121 : {
122 0 : fn new(
123 0 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
124 0 : http_client: reqwest::Client,
125 0 : jwt_token: Option<String>,
126 0 : max_offline_interval: Duration,
127 0 : max_warming_up_interval: Duration,
128 0 : cancel: CancellationToken,
129 0 : ) -> Self {
130 0 : Self {
131 0 : receiver,
132 0 : cancel,
133 0 : state: HashMap::new(),
134 0 : max_offline_interval,
135 0 : max_warming_up_interval,
136 0 : http_client,
137 0 : jwt_token,
138 0 : }
139 0 : }
140 0 : async fn run(&mut self) {
141 : loop {
142 0 : tokio::select! {
143 0 : request = self.receiver.recv() => {
144 0 : match request {
145 0 : Some(req) => {
146 0 : if req.reply.is_closed() {
147 : // Prevent a possibly infinite buildup of the receiver channel, if requests arrive faster than we can handle them
148 0 : continue;
149 0 : }
150 0 : let res = self.heartbeat(req.servers).await;
151 : // Ignore the return value in order to not panic if the heartbeat function's future was cancelled
152 0 : _ = req.reply.send(res);
153 : },
154 0 : None => { return; }
155 : }
156 : },
157 0 : _ = self.cancel.cancelled() => return
158 : }
159 : }
160 0 : }
161 : }
162 :
163 : pub(crate) trait HeartBeat<Server, State> {
164 : fn heartbeat(
165 : &mut self,
166 : pageservers: Arc<HashMap<NodeId, Server>>,
167 : ) -> impl Future<Output = Result<AvailablityDeltas<State>, HeartbeaterError>> + Send;
168 : }
169 :
170 : impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState> {
171 0 : async fn heartbeat(
172 0 : &mut self,
173 0 : pageservers: Arc<HashMap<NodeId, Node>>,
174 0 : ) -> Result<AvailablityDeltas<PageserverState>, HeartbeaterError> {
175 0 : let mut new_state = HashMap::new();
176 0 :
177 0 : let mut heartbeat_futs = FuturesUnordered::new();
178 0 : for (node_id, node) in &*pageservers {
179 0 : heartbeat_futs.push({
180 0 : let http_client = self.http_client.clone();
181 0 : let jwt_token = self.jwt_token.clone();
182 0 : let cancel = self.cancel.clone();
183 0 :
184 0 : // Clone the node and mark it as available such that the request
185 0 : // goes through to the pageserver even when the node is marked offline.
186 0 : // This doesn't impact the availability observed by [`crate::service::Service`].
187 0 : let mut node_clone = node.clone();
188 0 : node_clone
189 0 : .set_availability(NodeAvailability::Active(PageserverUtilization::full()));
190 0 :
191 0 : async move {
192 0 : let response = node_clone
193 0 : .with_client_retries(
194 0 : |client| async move { client.get_utilization().await },
195 0 : &http_client,
196 0 : &jwt_token,
197 0 : 3,
198 0 : 3,
199 0 : Duration::from_secs(1),
200 0 : &cancel,
201 0 : )
202 0 : .await;
203 :
204 0 : let response = match response {
205 0 : Some(r) => r,
206 : None => {
207 : // This indicates cancellation of the request.
208 : // We ignore the node in this case.
209 0 : return None;
210 : }
211 : };
212 :
213 0 : let status = if let Ok(utilization) = response {
214 0 : PageserverState::Available {
215 0 : last_seen_at: Instant::now(),
216 0 : utilization,
217 0 : }
218 0 : } else if let NodeAvailability::WarmingUp(last_seen_at) =
219 0 : node.get_availability()
220 : {
221 0 : PageserverState::WarmingUp {
222 0 : started_at: *last_seen_at,
223 0 : }
224 : } else {
225 0 : PageserverState::Offline
226 : };
227 :
228 0 : Some((*node_id, status))
229 0 : }
230 0 : });
231 0 : }
232 :
233 : loop {
234 0 : let maybe_status = tokio::select! {
235 0 : next = heartbeat_futs.next() => {
236 0 : match next {
237 0 : Some(result) => result,
238 0 : None => { break; }
239 : }
240 : },
241 0 : _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
242 : };
243 :
244 0 : if let Some((node_id, status)) = maybe_status {
245 0 : new_state.insert(node_id, status);
246 0 : }
247 : }
248 :
249 0 : let mut warming_up = 0;
250 0 : let mut offline = 0;
251 0 : for state in new_state.values() {
252 0 : match state {
253 0 : PageserverState::WarmingUp { .. } => {
254 0 : warming_up += 1;
255 0 : }
256 0 : PageserverState::Offline { .. } => offline += 1,
257 0 : PageserverState::Available { .. } => {}
258 : }
259 : }
260 :
261 0 : tracing::info!(
262 0 : "Heartbeat round complete for {} nodes, {} warming-up, {} offline",
263 0 : new_state.len(),
264 : warming_up,
265 : offline
266 : );
267 :
268 0 : let mut deltas = Vec::new();
269 0 : let now = Instant::now();
270 0 : for (node_id, ps_state) in new_state.iter_mut() {
271 : use std::collections::hash_map::Entry::*;
272 0 : let entry = self.state.entry(*node_id);
273 0 :
274 0 : let mut needs_update = false;
275 0 : match entry {
276 0 : Occupied(ref occ) => match (occ.get(), &ps_state) {
277 0 : (PageserverState::Offline, PageserverState::Offline) => {}
278 0 : (PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => {
279 0 : if now - *last_seen_at >= self.max_offline_interval {
280 0 : deltas.push((*node_id, ps_state.clone()));
281 0 : needs_update = true;
282 0 : }
283 : }
284 0 : (_, PageserverState::WarmingUp { started_at }) => {
285 0 : if now - *started_at >= self.max_warming_up_interval {
286 0 : *ps_state = PageserverState::Offline;
287 0 : }
288 :
289 0 : deltas.push((*node_id, ps_state.clone()));
290 0 : needs_update = true;
291 : }
292 0 : _ => {
293 0 : deltas.push((*node_id, ps_state.clone()));
294 0 : needs_update = true;
295 0 : }
296 : },
297 0 : Vacant(_) => {
298 0 : // This is a new node. Don't generate a delta for it.
299 0 : deltas.push((*node_id, ps_state.clone()));
300 0 : }
301 : }
302 :
303 0 : match entry {
304 0 : Occupied(mut occ) if needs_update => {
305 0 : (*occ.get_mut()) = ps_state.clone();
306 0 : }
307 0 : Vacant(vac) => {
308 0 : vac.insert(ps_state.clone());
309 0 : }
310 0 : _ => {}
311 : }
312 : }
313 :
314 0 : Ok(AvailablityDeltas(deltas))
315 0 : }
316 : }
317 :
318 : impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, SafekeeperState> {
319 0 : async fn heartbeat(
320 0 : &mut self,
321 0 : safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
322 0 : ) -> Result<AvailablityDeltas<SafekeeperState>, HeartbeaterError> {
323 0 : let mut new_state = HashMap::new();
324 0 :
325 0 : let mut heartbeat_futs = FuturesUnordered::new();
326 0 : for (node_id, sk) in &*safekeepers {
327 0 : if sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned {
328 0 : continue;
329 0 : }
330 0 : heartbeat_futs.push({
331 0 : let http_client = self.http_client.clone();
332 0 : let jwt_token = self
333 0 : .jwt_token
334 0 : .as_ref()
335 0 : .map(|t| SecretString::from(t.to_owned()));
336 0 : let cancel = self.cancel.clone();
337 0 :
338 0 : async move {
339 0 : let response = sk
340 0 : .with_client_retries(
341 0 : |client| async move { client.get_utilization().await },
342 0 : &http_client,
343 0 : &jwt_token,
344 0 : 3,
345 0 : 3,
346 0 : Duration::from_secs(1),
347 0 : &cancel,
348 0 : )
349 0 : .await;
350 :
351 0 : let status = match response {
352 0 : Ok(utilization) => SafekeeperState::Available {
353 0 : last_seen_at: Instant::now(),
354 0 : utilization,
355 0 : },
356 : Err(mgmt_api::Error::Cancelled) => {
357 : // This indicates cancellation of the request.
358 : // We ignore the node in this case.
359 0 : return None;
360 : }
361 0 : Err(e) => {
362 0 : tracing::info!(
363 0 : "Marking safekeeper {} at as offline: {e}",
364 0 : sk.base_url()
365 : );
366 0 : SafekeeperState::Offline
367 : }
368 : };
369 :
370 0 : Some((*node_id, status))
371 0 : }
372 0 : });
373 0 : }
374 :
375 : loop {
376 0 : let maybe_status = tokio::select! {
377 0 : next = heartbeat_futs.next() => {
378 0 : match next {
379 0 : Some(result) => result,
380 0 : None => { break; }
381 : }
382 : },
383 0 : _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
384 : };
385 :
386 0 : if let Some((node_id, status)) = maybe_status {
387 0 : new_state.insert(node_id, status);
388 0 : }
389 : }
390 :
391 0 : let mut offline = 0;
392 0 : for state in new_state.values() {
393 0 : match state {
394 0 : SafekeeperState::Offline { .. } => offline += 1,
395 0 : SafekeeperState::Available { .. } => {}
396 : }
397 : }
398 :
399 0 : tracing::info!(
400 0 : "Heartbeat round complete for {} safekeepers, {} offline",
401 0 : new_state.len(),
402 : offline
403 : );
404 :
405 0 : let mut deltas = Vec::new();
406 0 : let now = Instant::now();
407 0 : for (node_id, sk_state) in new_state.iter_mut() {
408 : use std::collections::hash_map::Entry::*;
409 0 : let entry = self.state.entry(*node_id);
410 0 :
411 0 : let mut needs_update = false;
412 0 : match entry {
413 0 : Occupied(ref occ) => match (occ.get(), &sk_state) {
414 0 : (SafekeeperState::Offline, SafekeeperState::Offline) => {}
415 0 : (SafekeeperState::Available { last_seen_at, .. }, SafekeeperState::Offline) => {
416 0 : if now - *last_seen_at >= self.max_offline_interval {
417 0 : deltas.push((*node_id, sk_state.clone()));
418 0 : needs_update = true;
419 0 : }
420 : }
421 0 : _ => {
422 0 : deltas.push((*node_id, sk_state.clone()));
423 0 : needs_update = true;
424 0 : }
425 : },
426 0 : Vacant(_) => {
427 0 : // This is a new node. Don't generate a delta for it.
428 0 : deltas.push((*node_id, sk_state.clone()));
429 0 : }
430 : }
431 :
432 0 : match entry {
433 0 : Occupied(mut occ) if needs_update => {
434 0 : (*occ.get_mut()) = sk_state.clone();
435 0 : }
436 0 : Vacant(vac) => {
437 0 : vac.insert(sk_state.clone());
438 0 : }
439 0 : _ => {}
440 : }
441 : }
442 :
443 0 : Ok(AvailablityDeltas(deltas))
444 0 : }
445 : }
|