Line data Source code
1 : use std::collections::HashMap;
2 : use std::fmt::Debug;
3 : use std::future::Future;
4 : use std::sync::Arc;
5 : use std::time::{Duration, Instant};
6 :
7 : use futures::StreamExt;
8 : use futures::stream::FuturesUnordered;
9 : use pageserver_api::controller_api::{NodeAvailability, SkSchedulingPolicy};
10 : use pageserver_api::models::PageserverUtilization;
11 : use safekeeper_api::models::SafekeeperUtilization;
12 : use safekeeper_client::mgmt_api;
13 : use thiserror::Error;
14 : use tokio_util::sync::CancellationToken;
15 : use utils::id::NodeId;
16 : use utils::logging::SecretString;
17 :
18 : use crate::node::Node;
19 : use crate::safekeeper::Safekeeper;
20 :
21 : struct HeartbeaterTask<Server, State> {
22 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
23 : cancel: CancellationToken,
24 :
25 : state: HashMap<NodeId, State>,
26 :
27 : max_offline_interval: Duration,
28 : max_warming_up_interval: Duration,
29 : jwt_token: Option<String>,
30 : }
31 :
32 : #[derive(Debug, Clone)]
33 : pub(crate) enum PageserverState {
34 : Available {
35 : last_seen_at: Instant,
36 : utilization: PageserverUtilization,
37 : },
38 : WarmingUp {
39 : started_at: Instant,
40 : },
41 : Offline,
42 : }
43 :
44 : #[derive(Debug, Clone)]
45 : pub(crate) enum SafekeeperState {
46 : Available {
47 : last_seen_at: Instant,
48 : utilization: SafekeeperUtilization,
49 : },
50 : Offline,
51 : }
52 :
53 : #[derive(Debug)]
54 : pub(crate) struct AvailablityDeltas<State>(pub Vec<(NodeId, State)>);
55 :
56 : #[derive(Debug, Error)]
57 : pub(crate) enum HeartbeaterError {
58 : #[error("Cancelled")]
59 : Cancel,
60 : }
61 :
62 : struct HeartbeatRequest<Server, State> {
63 : servers: Arc<HashMap<NodeId, Server>>,
64 : reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas<State>, HeartbeaterError>>,
65 : }
66 :
67 : pub(crate) struct Heartbeater<Server, State> {
68 : sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest<Server, State>>,
69 : }
70 :
71 : #[allow(private_bounds)]
72 : impl<Server: Send + Sync + 'static, State: Debug + Send + 'static> Heartbeater<Server, State>
73 : where
74 : HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
75 : {
76 0 : pub(crate) fn new(
77 0 : jwt_token: Option<String>,
78 0 : max_offline_interval: Duration,
79 0 : max_warming_up_interval: Duration,
80 0 : cancel: CancellationToken,
81 0 : ) -> Self {
82 0 : let (sender, receiver) =
83 0 : tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
84 0 : let mut heartbeater = HeartbeaterTask::new(
85 0 : receiver,
86 0 : jwt_token,
87 0 : max_offline_interval,
88 0 : max_warming_up_interval,
89 0 : cancel,
90 0 : );
91 0 : tokio::task::spawn(async move { heartbeater.run().await });
92 0 :
93 0 : Self { sender }
94 0 : }
95 :
96 0 : pub(crate) async fn heartbeat(
97 0 : &self,
98 0 : servers: Arc<HashMap<NodeId, Server>>,
99 0 : ) -> Result<AvailablityDeltas<State>, HeartbeaterError> {
100 0 : let (sender, receiver) = tokio::sync::oneshot::channel();
101 0 : self.sender
102 0 : .send(HeartbeatRequest {
103 0 : servers,
104 0 : reply: sender,
105 0 : })
106 0 : .map_err(|_| HeartbeaterError::Cancel)?;
107 :
108 0 : receiver
109 0 : .await
110 0 : .map_err(|_| HeartbeaterError::Cancel)
111 0 : .and_then(|x| x)
112 0 : }
113 : }
114 :
115 : impl<Server, State: Debug> HeartbeaterTask<Server, State>
116 : where
117 : HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
118 : {
119 0 : fn new(
120 0 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
121 0 : jwt_token: Option<String>,
122 0 : max_offline_interval: Duration,
123 0 : max_warming_up_interval: Duration,
124 0 : cancel: CancellationToken,
125 0 : ) -> Self {
126 0 : Self {
127 0 : receiver,
128 0 : cancel,
129 0 : state: HashMap::new(),
130 0 : max_offline_interval,
131 0 : max_warming_up_interval,
132 0 : jwt_token,
133 0 : }
134 0 : }
135 0 : async fn run(&mut self) {
136 : loop {
137 0 : tokio::select! {
138 0 : request = self.receiver.recv() => {
139 0 : match request {
140 0 : Some(req) => {
141 0 : if req.reply.is_closed() {
142 : // Prevent a possibly infinite buildup of the receiver channel, if requests arrive faster than we can handle them
143 0 : continue;
144 0 : }
145 0 : let res = self.heartbeat(req.servers).await;
146 : // Ignore the return value in order to not panic if the heartbeat function's future was cancelled
147 0 : _ = req.reply.send(res);
148 : },
149 0 : None => { return; }
150 : }
151 : },
152 0 : _ = self.cancel.cancelled() => return
153 : }
154 : }
155 0 : }
156 : }
157 :
158 : pub(crate) trait HeartBeat<Server, State> {
159 : fn heartbeat(
160 : &mut self,
161 : pageservers: Arc<HashMap<NodeId, Server>>,
162 : ) -> impl Future<Output = Result<AvailablityDeltas<State>, HeartbeaterError>> + Send;
163 : }
164 :
165 : impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState> {
166 0 : async fn heartbeat(
167 0 : &mut self,
168 0 : pageservers: Arc<HashMap<NodeId, Node>>,
169 0 : ) -> Result<AvailablityDeltas<PageserverState>, HeartbeaterError> {
170 0 : let mut new_state = HashMap::new();
171 0 :
172 0 : let mut heartbeat_futs = FuturesUnordered::new();
173 0 : for (node_id, node) in &*pageservers {
174 0 : heartbeat_futs.push({
175 0 : let jwt_token = self.jwt_token.clone();
176 0 : let cancel = self.cancel.clone();
177 0 :
178 0 : // Clone the node and mark it as available such that the request
179 0 : // goes through to the pageserver even when the node is marked offline.
180 0 : // This doesn't impact the availability observed by [`crate::service::Service`].
181 0 : let mut node_clone = node.clone();
182 0 : node_clone
183 0 : .set_availability(NodeAvailability::Active(PageserverUtilization::full()));
184 0 :
185 0 : async move {
186 0 : let response = node_clone
187 0 : .with_client_retries(
188 0 : |client| async move { client.get_utilization().await },
189 0 : &jwt_token,
190 0 : 3,
191 0 : 3,
192 0 : Duration::from_secs(1),
193 0 : &cancel,
194 0 : )
195 0 : .await;
196 :
197 0 : let response = match response {
198 0 : Some(r) => r,
199 : None => {
200 : // This indicates cancellation of the request.
201 : // We ignore the node in this case.
202 0 : return None;
203 : }
204 : };
205 :
206 0 : let status = if let Ok(utilization) = response {
207 0 : PageserverState::Available {
208 0 : last_seen_at: Instant::now(),
209 0 : utilization,
210 0 : }
211 0 : } else if let NodeAvailability::WarmingUp(last_seen_at) =
212 0 : node.get_availability()
213 : {
214 0 : PageserverState::WarmingUp {
215 0 : started_at: *last_seen_at,
216 0 : }
217 : } else {
218 0 : PageserverState::Offline
219 : };
220 :
221 0 : Some((*node_id, status))
222 0 : }
223 0 : });
224 0 : }
225 :
226 : loop {
227 0 : let maybe_status = tokio::select! {
228 0 : next = heartbeat_futs.next() => {
229 0 : match next {
230 0 : Some(result) => result,
231 0 : None => { break; }
232 : }
233 : },
234 0 : _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
235 : };
236 :
237 0 : if let Some((node_id, status)) = maybe_status {
238 0 : new_state.insert(node_id, status);
239 0 : }
240 : }
241 :
242 0 : let mut warming_up = 0;
243 0 : let mut offline = 0;
244 0 : for state in new_state.values() {
245 0 : match state {
246 0 : PageserverState::WarmingUp { .. } => {
247 0 : warming_up += 1;
248 0 : }
249 0 : PageserverState::Offline { .. } => offline += 1,
250 0 : PageserverState::Available { .. } => {}
251 : }
252 : }
253 :
254 0 : tracing::info!(
255 0 : "Heartbeat round complete for {} nodes, {} warming-up, {} offline",
256 0 : new_state.len(),
257 : warming_up,
258 : offline
259 : );
260 :
261 0 : let mut deltas = Vec::new();
262 0 : let now = Instant::now();
263 0 : for (node_id, ps_state) in new_state.iter_mut() {
264 : use std::collections::hash_map::Entry::*;
265 0 : let entry = self.state.entry(*node_id);
266 0 :
267 0 : let mut needs_update = false;
268 0 : match entry {
269 0 : Occupied(ref occ) => match (occ.get(), &ps_state) {
270 0 : (PageserverState::Offline, PageserverState::Offline) => {}
271 0 : (PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => {
272 0 : if now - *last_seen_at >= self.max_offline_interval {
273 0 : deltas.push((*node_id, ps_state.clone()));
274 0 : needs_update = true;
275 0 : }
276 : }
277 0 : (_, PageserverState::WarmingUp { started_at }) => {
278 0 : if now - *started_at >= self.max_warming_up_interval {
279 0 : *ps_state = PageserverState::Offline;
280 0 : }
281 :
282 0 : deltas.push((*node_id, ps_state.clone()));
283 0 : needs_update = true;
284 : }
285 0 : _ => {
286 0 : deltas.push((*node_id, ps_state.clone()));
287 0 : needs_update = true;
288 0 : }
289 : },
290 0 : Vacant(_) => {
291 0 : // This is a new node. Don't generate a delta for it.
292 0 : deltas.push((*node_id, ps_state.clone()));
293 0 : }
294 : }
295 :
296 0 : match entry {
297 0 : Occupied(mut occ) if needs_update => {
298 0 : (*occ.get_mut()) = ps_state.clone();
299 0 : }
300 0 : Vacant(vac) => {
301 0 : vac.insert(ps_state.clone());
302 0 : }
303 0 : _ => {}
304 : }
305 : }
306 :
307 0 : Ok(AvailablityDeltas(deltas))
308 0 : }
309 : }
310 :
311 : impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, SafekeeperState> {
312 0 : async fn heartbeat(
313 0 : &mut self,
314 0 : safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
315 0 : ) -> Result<AvailablityDeltas<SafekeeperState>, HeartbeaterError> {
316 0 : let mut new_state = HashMap::new();
317 0 :
318 0 : let mut heartbeat_futs = FuturesUnordered::new();
319 0 : for (node_id, sk) in &*safekeepers {
320 0 : if sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned {
321 0 : continue;
322 0 : }
323 0 : heartbeat_futs.push({
324 0 : let jwt_token = self
325 0 : .jwt_token
326 0 : .as_ref()
327 0 : .map(|t| SecretString::from(t.to_owned()));
328 0 : let cancel = self.cancel.clone();
329 0 :
330 0 : async move {
331 0 : let response = sk
332 0 : .with_client_retries(
333 0 : |client| async move { client.get_utilization().await },
334 0 : &jwt_token,
335 0 : 3,
336 0 : 3,
337 0 : Duration::from_secs(1),
338 0 : &cancel,
339 0 : )
340 0 : .await;
341 :
342 0 : let status = match response {
343 0 : Ok(utilization) => SafekeeperState::Available {
344 0 : last_seen_at: Instant::now(),
345 0 : utilization,
346 0 : },
347 : Err(mgmt_api::Error::Cancelled) => {
348 : // This indicates cancellation of the request.
349 : // We ignore the node in this case.
350 0 : return None;
351 : }
352 0 : Err(e) => {
353 0 : tracing::info!(
354 0 : "Marking safekeeper {} at as offline: {e}",
355 0 : sk.base_url()
356 : );
357 0 : SafekeeperState::Offline
358 : }
359 : };
360 :
361 0 : Some((*node_id, status))
362 0 : }
363 0 : });
364 0 : }
365 :
366 : loop {
367 0 : let maybe_status = tokio::select! {
368 0 : next = heartbeat_futs.next() => {
369 0 : match next {
370 0 : Some(result) => result,
371 0 : None => { break; }
372 : }
373 : },
374 0 : _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
375 : };
376 :
377 0 : if let Some((node_id, status)) = maybe_status {
378 0 : new_state.insert(node_id, status);
379 0 : }
380 : }
381 :
382 0 : let mut offline = 0;
383 0 : for state in new_state.values() {
384 0 : match state {
385 0 : SafekeeperState::Offline { .. } => offline += 1,
386 0 : SafekeeperState::Available { .. } => {}
387 : }
388 : }
389 :
390 0 : tracing::info!(
391 0 : "Heartbeat round complete for {} safekeepers, {} offline",
392 0 : new_state.len(),
393 : offline
394 : );
395 :
396 0 : let mut deltas = Vec::new();
397 0 : let now = Instant::now();
398 0 : for (node_id, sk_state) in new_state.iter_mut() {
399 : use std::collections::hash_map::Entry::*;
400 0 : let entry = self.state.entry(*node_id);
401 0 :
402 0 : let mut needs_update = false;
403 0 : match entry {
404 0 : Occupied(ref occ) => match (occ.get(), &sk_state) {
405 0 : (SafekeeperState::Offline, SafekeeperState::Offline) => {}
406 0 : (SafekeeperState::Available { last_seen_at, .. }, SafekeeperState::Offline) => {
407 0 : if now - *last_seen_at >= self.max_offline_interval {
408 0 : deltas.push((*node_id, sk_state.clone()));
409 0 : needs_update = true;
410 0 : }
411 : }
412 0 : _ => {
413 0 : deltas.push((*node_id, sk_state.clone()));
414 0 : needs_update = true;
415 0 : }
416 : },
417 0 : Vacant(_) => {
418 0 : // This is a new node. Don't generate a delta for it.
419 0 : deltas.push((*node_id, sk_state.clone()));
420 0 : }
421 : }
422 :
423 0 : match entry {
424 0 : Occupied(mut occ) if needs_update => {
425 0 : (*occ.get_mut()) = sk_state.clone();
426 0 : }
427 0 : Vacant(vac) => {
428 0 : vac.insert(sk_state.clone());
429 0 : }
430 0 : _ => {}
431 : }
432 : }
433 :
434 0 : Ok(AvailablityDeltas(deltas))
435 0 : }
436 : }
|