Line data Source code
1 : use futures::{stream::FuturesUnordered, StreamExt};
2 : use safekeeper_api::models::SafekeeperUtilization;
3 : use safekeeper_client::mgmt_api;
4 : use std::{
5 : collections::HashMap,
6 : fmt::Debug,
7 : future::Future,
8 : sync::Arc,
9 : time::{Duration, Instant},
10 : };
11 : use tokio_util::sync::CancellationToken;
12 :
13 : use pageserver_api::{controller_api::NodeAvailability, models::PageserverUtilization};
14 :
15 : use thiserror::Error;
16 : use utils::{id::NodeId, logging::SecretString};
17 :
18 : use crate::{node::Node, safekeeper::Safekeeper};
19 :
20 : struct HeartbeaterTask<Server, State> {
21 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
22 : cancel: CancellationToken,
23 :
24 : state: HashMap<NodeId, State>,
25 :
26 : max_offline_interval: Duration,
27 : max_warming_up_interval: Duration,
28 : jwt_token: Option<String>,
29 : }
30 :
31 : #[derive(Debug, Clone)]
32 : pub(crate) enum PageserverState {
33 : Available {
34 : last_seen_at: Instant,
35 : utilization: PageserverUtilization,
36 : },
37 : WarmingUp {
38 : started_at: Instant,
39 : },
40 : Offline,
41 : }
42 :
43 : #[derive(Debug, Clone)]
44 : pub(crate) enum SafekeeperState {
45 : Available {
46 : last_seen_at: Instant,
47 : utilization: SafekeeperUtilization,
48 : },
49 : Offline,
50 : }
51 :
52 : #[derive(Debug)]
53 : pub(crate) struct AvailablityDeltas<State>(pub Vec<(NodeId, State)>);
54 :
55 : #[derive(Debug, Error)]
56 : pub(crate) enum HeartbeaterError {
57 : #[error("Cancelled")]
58 : Cancel,
59 : }
60 :
61 : struct HeartbeatRequest<Server, State> {
62 : servers: Arc<HashMap<NodeId, Server>>,
63 : reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas<State>, HeartbeaterError>>,
64 : }
65 :
66 : pub(crate) struct Heartbeater<Server, State> {
67 : sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest<Server, State>>,
68 : }
69 :
70 : #[allow(private_bounds)]
71 : impl<Server: Send + Sync + 'static, State: Debug + Send + 'static> Heartbeater<Server, State>
72 : where
73 : HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
74 : {
75 0 : pub(crate) fn new(
76 0 : jwt_token: Option<String>,
77 0 : max_offline_interval: Duration,
78 0 : max_warming_up_interval: Duration,
79 0 : cancel: CancellationToken,
80 0 : ) -> Self {
81 0 : let (sender, receiver) =
82 0 : tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
83 0 : let mut heartbeater = HeartbeaterTask::new(
84 0 : receiver,
85 0 : jwt_token,
86 0 : max_offline_interval,
87 0 : max_warming_up_interval,
88 0 : cancel,
89 0 : );
90 0 : tokio::task::spawn(async move { heartbeater.run().await });
91 0 :
92 0 : Self { sender }
93 0 : }
94 :
95 0 : pub(crate) async fn heartbeat(
96 0 : &self,
97 0 : servers: Arc<HashMap<NodeId, Server>>,
98 0 : ) -> Result<AvailablityDeltas<State>, HeartbeaterError> {
99 0 : let (sender, receiver) = tokio::sync::oneshot::channel();
100 0 : self.sender
101 0 : .send(HeartbeatRequest {
102 0 : servers,
103 0 : reply: sender,
104 0 : })
105 0 : .map_err(|_| HeartbeaterError::Cancel)?;
106 :
107 0 : receiver
108 0 : .await
109 0 : .map_err(|_| HeartbeaterError::Cancel)
110 0 : .and_then(|x| x)
111 0 : }
112 : }
113 :
114 : impl<Server, State: Debug> HeartbeaterTask<Server, State>
115 : where
116 : HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
117 : {
118 0 : fn new(
119 0 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
120 0 : jwt_token: Option<String>,
121 0 : max_offline_interval: Duration,
122 0 : max_warming_up_interval: Duration,
123 0 : cancel: CancellationToken,
124 0 : ) -> Self {
125 0 : Self {
126 0 : receiver,
127 0 : cancel,
128 0 : state: HashMap::new(),
129 0 : max_offline_interval,
130 0 : max_warming_up_interval,
131 0 : jwt_token,
132 0 : }
133 0 : }
134 0 : async fn run(&mut self) {
135 : loop {
136 0 : tokio::select! {
137 0 : request = self.receiver.recv() => {
138 0 : match request {
139 0 : Some(req) => {
140 0 : let res = self.heartbeat(req.servers).await;
141 0 : req.reply.send(res).unwrap();
142 : },
143 0 : None => { return; }
144 : }
145 : },
146 0 : _ = self.cancel.cancelled() => return
147 : }
148 : }
149 0 : }
150 : }
151 :
152 : pub(crate) trait HeartBeat<Server, State> {
153 : fn heartbeat(
154 : &mut self,
155 : pageservers: Arc<HashMap<NodeId, Server>>,
156 : ) -> impl Future<Output = Result<AvailablityDeltas<State>, HeartbeaterError>> + Send;
157 : }
158 :
159 : impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState> {
160 0 : async fn heartbeat(
161 0 : &mut self,
162 0 : pageservers: Arc<HashMap<NodeId, Node>>,
163 0 : ) -> Result<AvailablityDeltas<PageserverState>, HeartbeaterError> {
164 0 : let mut new_state = HashMap::new();
165 0 :
166 0 : let mut heartbeat_futs = FuturesUnordered::new();
167 0 : for (node_id, node) in &*pageservers {
168 0 : heartbeat_futs.push({
169 0 : let jwt_token = self.jwt_token.clone();
170 0 : let cancel = self.cancel.clone();
171 0 :
172 0 : // Clone the node and mark it as available such that the request
173 0 : // goes through to the pageserver even when the node is marked offline.
174 0 : // This doesn't impact the availability observed by [`crate::service::Service`].
175 0 : let mut node_clone = node.clone();
176 0 : node_clone
177 0 : .set_availability(NodeAvailability::Active(PageserverUtilization::full()));
178 0 :
179 0 : async move {
180 0 : let response = node_clone
181 0 : .with_client_retries(
182 0 : |client| async move { client.get_utilization().await },
183 0 : &jwt_token,
184 0 : 3,
185 0 : 3,
186 0 : Duration::from_secs(1),
187 0 : &cancel,
188 0 : )
189 0 : .await;
190 :
191 0 : let response = match response {
192 0 : Some(r) => r,
193 : None => {
194 : // This indicates cancellation of the request.
195 : // We ignore the node in this case.
196 0 : return None;
197 : }
198 : };
199 :
200 0 : let status = if let Ok(utilization) = response {
201 0 : PageserverState::Available {
202 0 : last_seen_at: Instant::now(),
203 0 : utilization,
204 0 : }
205 0 : } else if let NodeAvailability::WarmingUp(last_seen_at) =
206 0 : node.get_availability()
207 : {
208 0 : PageserverState::WarmingUp {
209 0 : started_at: *last_seen_at,
210 0 : }
211 : } else {
212 0 : PageserverState::Offline
213 : };
214 :
215 0 : Some((*node_id, status))
216 0 : }
217 0 : });
218 :
219 : loop {
220 0 : let maybe_status = tokio::select! {
221 0 : next = heartbeat_futs.next() => {
222 0 : match next {
223 0 : Some(result) => result,
224 0 : None => { break; }
225 : }
226 : },
227 0 : _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
228 : };
229 :
230 0 : if let Some((node_id, status)) = maybe_status {
231 0 : new_state.insert(node_id, status);
232 0 : }
233 : }
234 : }
235 :
236 0 : let mut warming_up = 0;
237 0 : let mut offline = 0;
238 0 : for state in new_state.values() {
239 0 : match state {
240 0 : PageserverState::WarmingUp { .. } => {
241 0 : warming_up += 1;
242 0 : }
243 0 : PageserverState::Offline { .. } => offline += 1,
244 0 : PageserverState::Available { .. } => {}
245 : }
246 : }
247 :
248 0 : tracing::info!(
249 0 : "Heartbeat round complete for {} nodes, {} warming-up, {} offline",
250 0 : new_state.len(),
251 : warming_up,
252 : offline
253 : );
254 :
255 0 : let mut deltas = Vec::new();
256 0 : let now = Instant::now();
257 0 : for (node_id, ps_state) in new_state.iter_mut() {
258 : use std::collections::hash_map::Entry::*;
259 0 : let entry = self.state.entry(*node_id);
260 0 :
261 0 : let mut needs_update = false;
262 0 : match entry {
263 0 : Occupied(ref occ) => match (occ.get(), &ps_state) {
264 0 : (PageserverState::Offline, PageserverState::Offline) => {}
265 0 : (PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => {
266 0 : if now - *last_seen_at >= self.max_offline_interval {
267 0 : deltas.push((*node_id, ps_state.clone()));
268 0 : needs_update = true;
269 0 : }
270 : }
271 0 : (_, PageserverState::WarmingUp { started_at }) => {
272 0 : if now - *started_at >= self.max_warming_up_interval {
273 0 : *ps_state = PageserverState::Offline;
274 0 : }
275 :
276 0 : deltas.push((*node_id, ps_state.clone()));
277 0 : needs_update = true;
278 : }
279 0 : _ => {
280 0 : deltas.push((*node_id, ps_state.clone()));
281 0 : needs_update = true;
282 0 : }
283 : },
284 0 : Vacant(_) => {
285 0 : // This is a new node. Don't generate a delta for it.
286 0 : deltas.push((*node_id, ps_state.clone()));
287 0 : }
288 : }
289 :
290 0 : match entry {
291 0 : Occupied(mut occ) if needs_update => {
292 0 : (*occ.get_mut()) = ps_state.clone();
293 0 : }
294 0 : Vacant(vac) => {
295 0 : vac.insert(ps_state.clone());
296 0 : }
297 0 : _ => {}
298 : }
299 : }
300 :
301 0 : Ok(AvailablityDeltas(deltas))
302 0 : }
303 : }
304 :
305 : impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, SafekeeperState> {
306 0 : async fn heartbeat(
307 0 : &mut self,
308 0 : safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
309 0 : ) -> Result<AvailablityDeltas<SafekeeperState>, HeartbeaterError> {
310 0 : let mut new_state = HashMap::new();
311 0 :
312 0 : let mut heartbeat_futs = FuturesUnordered::new();
313 0 : for (node_id, sk) in &*safekeepers {
314 0 : heartbeat_futs.push({
315 0 : let jwt_token = self
316 0 : .jwt_token
317 0 : .as_ref()
318 0 : .map(|t| SecretString::from(t.to_owned()));
319 0 : let cancel = self.cancel.clone();
320 0 :
321 0 : async move {
322 0 : let response = sk
323 0 : .with_client_retries(
324 0 : |client| async move { client.get_utilization().await },
325 0 : &jwt_token,
326 0 : 3,
327 0 : 3,
328 0 : Duration::from_secs(1),
329 0 : &cancel,
330 0 : )
331 0 : .await;
332 :
333 0 : let status = match response {
334 0 : Ok(utilization) => SafekeeperState::Available {
335 0 : last_seen_at: Instant::now(),
336 0 : utilization,
337 0 : },
338 : Err(mgmt_api::Error::Cancelled) => {
339 : // This indicates cancellation of the request.
340 : // We ignore the node in this case.
341 0 : return None;
342 : }
343 0 : Err(_) => SafekeeperState::Offline,
344 : };
345 :
346 0 : Some((*node_id, status))
347 0 : }
348 0 : });
349 :
350 : loop {
351 0 : let maybe_status = tokio::select! {
352 0 : next = heartbeat_futs.next() => {
353 0 : match next {
354 0 : Some(result) => result,
355 0 : None => { break; }
356 : }
357 : },
358 0 : _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
359 : };
360 :
361 0 : if let Some((node_id, status)) = maybe_status {
362 0 : new_state.insert(node_id, status);
363 0 : }
364 : }
365 : }
366 :
367 0 : let mut offline = 0;
368 0 : for state in new_state.values() {
369 0 : match state {
370 0 : SafekeeperState::Offline { .. } => offline += 1,
371 0 : SafekeeperState::Available { .. } => {}
372 : }
373 : }
374 :
375 0 : tracing::info!(
376 0 : "Heartbeat round complete for {} safekeepers, {} offline",
377 0 : new_state.len(),
378 : offline
379 : );
380 :
381 0 : let mut deltas = Vec::new();
382 0 : let now = Instant::now();
383 0 : for (node_id, sk_state) in new_state.iter_mut() {
384 : use std::collections::hash_map::Entry::*;
385 0 : let entry = self.state.entry(*node_id);
386 0 :
387 0 : let mut needs_update = false;
388 0 : match entry {
389 0 : Occupied(ref occ) => match (occ.get(), &sk_state) {
390 0 : (SafekeeperState::Offline, SafekeeperState::Offline) => {}
391 0 : (SafekeeperState::Available { last_seen_at, .. }, SafekeeperState::Offline) => {
392 0 : if now - *last_seen_at >= self.max_offline_interval {
393 0 : deltas.push((*node_id, sk_state.clone()));
394 0 : needs_update = true;
395 0 : }
396 : }
397 0 : _ => {
398 0 : deltas.push((*node_id, sk_state.clone()));
399 0 : needs_update = true;
400 0 : }
401 : },
402 0 : Vacant(_) => {
403 0 : // This is a new node. Don't generate a delta for it.
404 0 : deltas.push((*node_id, sk_state.clone()));
405 0 : }
406 : }
407 :
408 0 : match entry {
409 0 : Occupied(mut occ) if needs_update => {
410 0 : (*occ.get_mut()) = sk_state.clone();
411 0 : }
412 0 : Vacant(vac) => {
413 0 : vac.insert(sk_state.clone());
414 0 : }
415 0 : _ => {}
416 : }
417 : }
418 :
419 0 : Ok(AvailablityDeltas(deltas))
420 0 : }
421 : }
|