Line data Source code
1 : use futures::{stream::FuturesUnordered, StreamExt};
2 : use std::{
3 : collections::HashMap,
4 : sync::Arc,
5 : time::{Duration, Instant},
6 : };
7 : use tokio_util::sync::CancellationToken;
8 :
9 : use pageserver_api::{
10 : controller_api::{NodeAvailability, UtilizationScore},
11 : models::PageserverUtilization,
12 : };
13 :
14 : use thiserror::Error;
15 : use utils::id::NodeId;
16 :
17 : use crate::node::Node;
18 :
19 : struct HeartbeaterTask {
20 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
21 : cancel: CancellationToken,
22 :
23 : state: HashMap<NodeId, PageserverState>,
24 :
25 : max_unavailable_interval: Duration,
26 : jwt_token: Option<String>,
27 : }
28 :
29 : #[derive(Debug, Clone)]
30 : pub(crate) enum PageserverState {
31 : Available {
32 : last_seen_at: Instant,
33 : utilization: PageserverUtilization,
34 : },
35 : Offline,
36 : }
37 :
38 : #[derive(Debug)]
39 : pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, PageserverState)>);
40 :
41 0 : #[derive(Debug, Error)]
42 : pub(crate) enum HeartbeaterError {
43 : #[error("Cancelled")]
44 : Cancel,
45 : }
46 :
47 : struct HeartbeatRequest {
48 : pageservers: Arc<HashMap<NodeId, Node>>,
49 : reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas, HeartbeaterError>>,
50 : }
51 :
52 : pub(crate) struct Heartbeater {
53 : sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest>,
54 : }
55 :
56 : impl Heartbeater {
57 0 : pub(crate) fn new(
58 0 : jwt_token: Option<String>,
59 0 : max_unavailable_interval: Duration,
60 0 : cancel: CancellationToken,
61 0 : ) -> Self {
62 0 : let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest>();
63 0 : let mut heartbeater =
64 0 : HeartbeaterTask::new(receiver, jwt_token, max_unavailable_interval, cancel);
65 0 : tokio::task::spawn(async move { heartbeater.run().await });
66 0 :
67 0 : Self { sender }
68 0 : }
69 :
70 0 : pub(crate) async fn heartbeat(
71 0 : &self,
72 0 : pageservers: Arc<HashMap<NodeId, Node>>,
73 0 : ) -> Result<AvailablityDeltas, HeartbeaterError> {
74 0 : let (sender, receiver) = tokio::sync::oneshot::channel();
75 0 : self.sender
76 0 : .send(HeartbeatRequest {
77 0 : pageservers,
78 0 : reply: sender,
79 0 : })
80 0 : .unwrap();
81 0 :
82 0 : receiver.await.unwrap()
83 0 : }
84 : }
85 :
86 : impl HeartbeaterTask {
87 0 : fn new(
88 0 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
89 0 : jwt_token: Option<String>,
90 0 : max_unavailable_interval: Duration,
91 0 : cancel: CancellationToken,
92 0 : ) -> Self {
93 0 : Self {
94 0 : receiver,
95 0 : cancel,
96 0 : state: HashMap::new(),
97 0 : max_unavailable_interval,
98 0 : jwt_token,
99 0 : }
100 0 : }
101 :
102 0 : async fn run(&mut self) {
103 0 : loop {
104 0 : tokio::select! {
105 : request = self.receiver.recv() => {
106 : match request {
107 : Some(req) => {
108 : let res = self.heartbeat(req.pageservers).await;
109 : req.reply.send(res).unwrap();
110 : },
111 : None => { return; }
112 : }
113 : },
114 : _ = self.cancel.cancelled() => return
115 0 : }
116 0 : }
117 0 : }
118 :
119 0 : async fn heartbeat(
120 0 : &mut self,
121 0 : pageservers: Arc<HashMap<NodeId, Node>>,
122 0 : ) -> Result<AvailablityDeltas, HeartbeaterError> {
123 0 : let mut new_state = HashMap::new();
124 0 :
125 0 : let mut heartbeat_futs = FuturesUnordered::new();
126 0 : for (node_id, node) in &*pageservers {
127 0 : heartbeat_futs.push({
128 0 : let jwt_token = self.jwt_token.clone();
129 0 : let cancel = self.cancel.clone();
130 0 :
131 0 : // Clone the node and mark it as available such that the request
132 0 : // goes through to the pageserver even when the node is marked offline.
133 0 : // This doesn't impact the availability observed by [`crate::service::Service`].
134 0 : let mut node = node.clone();
135 0 : node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
136 0 :
137 0 : async move {
138 0 : let response = node
139 0 : .with_client_retries(
140 0 : |client| async move { client.get_utilization().await },
141 0 : &jwt_token,
142 0 : 3,
143 0 : 3,
144 0 : Duration::from_secs(1),
145 0 : &cancel,
146 0 : )
147 0 : .await;
148 :
149 0 : let response = match response {
150 0 : Some(r) => r,
151 : None => {
152 : // This indicates cancellation of the request.
153 : // We ignore the node in this case.
154 0 : return None;
155 : }
156 : };
157 :
158 0 : let status = if let Ok(utilization) = response {
159 0 : PageserverState::Available {
160 0 : last_seen_at: Instant::now(),
161 0 : utilization,
162 0 : }
163 : } else {
164 0 : PageserverState::Offline
165 : };
166 :
167 0 : Some((*node_id, status))
168 0 : }
169 0 : });
170 :
171 : loop {
172 0 : let maybe_status = tokio::select! {
173 : next = heartbeat_futs.next() => {
174 : match next {
175 : Some(result) => result,
176 : None => { break; }
177 : }
178 : },
179 : _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
180 : };
181 :
182 0 : if let Some((node_id, status)) = maybe_status {
183 0 : new_state.insert(node_id, status);
184 0 : }
185 : }
186 : }
187 0 : tracing::info!(
188 0 : "Heartbeat round complete for {} nodes, {} offline",
189 0 : new_state.len(),
190 0 : new_state
191 0 : .values()
192 0 : .filter(|s| match s {
193 : PageserverState::Available { .. } => {
194 0 : false
195 : }
196 0 : PageserverState::Offline => true,
197 0 : })
198 0 : .count()
199 : );
200 :
201 0 : let mut deltas = Vec::new();
202 0 : let now = Instant::now();
203 0 : for (node_id, ps_state) in new_state {
204 : use std::collections::hash_map::Entry::*;
205 0 : let entry = self.state.entry(node_id);
206 0 :
207 0 : let mut needs_update = false;
208 0 : match entry {
209 0 : Occupied(ref occ) => match (occ.get(), &ps_state) {
210 0 : (PageserverState::Offline, PageserverState::Offline) => {}
211 0 : (PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => {
212 0 : if now - *last_seen_at >= self.max_unavailable_interval {
213 0 : deltas.push((node_id, ps_state.clone()));
214 0 : needs_update = true;
215 0 : }
216 : }
217 0 : _ => {
218 0 : deltas.push((node_id, ps_state.clone()));
219 0 : needs_update = true;
220 0 : }
221 : },
222 0 : Vacant(_) => {
223 0 : deltas.push((node_id, ps_state.clone()));
224 0 : }
225 : }
226 :
227 0 : match entry {
228 0 : Occupied(mut occ) if needs_update => {
229 0 : (*occ.get_mut()) = ps_state;
230 0 : }
231 0 : Vacant(vac) => {
232 0 : vac.insert(ps_state);
233 0 : }
234 0 : _ => {}
235 : }
236 : }
237 :
238 0 : Ok(AvailablityDeltas(deltas))
239 0 : }
240 : }
|