Line data Source code
1 : use futures::{stream::FuturesUnordered, StreamExt};
2 : use std::{
3 : collections::HashMap,
4 : sync::Arc,
5 : time::{Duration, Instant},
6 : };
7 : use tokio_util::sync::CancellationToken;
8 :
9 : use pageserver_api::{
10 : controller_api::{NodeAvailability, UtilizationScore},
11 : models::PageserverUtilization,
12 : };
13 :
14 : use thiserror::Error;
15 : use utils::id::NodeId;
16 :
17 : use crate::node::Node;
18 :
19 : struct HeartbeaterTask {
20 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
21 : cancel: CancellationToken,
22 :
23 : state: HashMap<NodeId, PageserverState>,
24 :
25 : max_offline_interval: Duration,
26 : max_warming_up_interval: Duration,
27 : jwt_token: Option<String>,
28 : }
29 :
30 : #[derive(Debug, Clone)]
31 : pub(crate) enum PageserverState {
32 : Available {
33 : last_seen_at: Instant,
34 : utilization: PageserverUtilization,
35 : },
36 : WarmingUp {
37 : started_at: Instant,
38 : },
39 : Offline,
40 : }
41 :
42 : #[derive(Debug)]
43 : pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, PageserverState)>);
44 :
45 0 : #[derive(Debug, Error)]
46 : pub(crate) enum HeartbeaterError {
47 : #[error("Cancelled")]
48 : Cancel,
49 : }
50 :
51 : struct HeartbeatRequest {
52 : pageservers: Arc<HashMap<NodeId, Node>>,
53 : reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas, HeartbeaterError>>,
54 : }
55 :
56 : pub(crate) struct Heartbeater {
57 : sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest>,
58 : }
59 :
60 : impl Heartbeater {
61 0 : pub(crate) fn new(
62 0 : jwt_token: Option<String>,
63 0 : max_offline_interval: Duration,
64 0 : max_warming_up_interval: Duration,
65 0 : cancel: CancellationToken,
66 0 : ) -> Self {
67 0 : let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest>();
68 0 : let mut heartbeater = HeartbeaterTask::new(
69 0 : receiver,
70 0 : jwt_token,
71 0 : max_offline_interval,
72 0 : max_warming_up_interval,
73 0 : cancel,
74 0 : );
75 0 : tokio::task::spawn(async move { heartbeater.run().await });
76 0 :
77 0 : Self { sender }
78 0 : }
79 :
80 0 : pub(crate) async fn heartbeat(
81 0 : &self,
82 0 : pageservers: Arc<HashMap<NodeId, Node>>,
83 0 : ) -> Result<AvailablityDeltas, HeartbeaterError> {
84 0 : let (sender, receiver) = tokio::sync::oneshot::channel();
85 0 : self.sender
86 0 : .send(HeartbeatRequest {
87 0 : pageservers,
88 0 : reply: sender,
89 0 : })
90 0 : .map_err(|_| HeartbeaterError::Cancel)?;
91 :
92 0 : receiver
93 0 : .await
94 0 : .map_err(|_| HeartbeaterError::Cancel)
95 0 : .and_then(|x| x)
96 0 : }
97 : }
98 :
99 : impl HeartbeaterTask {
100 0 : fn new(
101 0 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
102 0 : jwt_token: Option<String>,
103 0 : max_offline_interval: Duration,
104 0 : max_warming_up_interval: Duration,
105 0 : cancel: CancellationToken,
106 0 : ) -> Self {
107 0 : Self {
108 0 : receiver,
109 0 : cancel,
110 0 : state: HashMap::new(),
111 0 : max_offline_interval,
112 0 : max_warming_up_interval,
113 0 : jwt_token,
114 0 : }
115 0 : }
116 :
117 0 : async fn run(&mut self) {
118 0 : loop {
119 0 : tokio::select! {
120 : request = self.receiver.recv() => {
121 : match request {
122 : Some(req) => {
123 : let res = self.heartbeat(req.pageservers).await;
124 : req.reply.send(res).unwrap();
125 : },
126 : None => { return; }
127 : }
128 : },
129 : _ = self.cancel.cancelled() => return
130 0 : }
131 0 : }
132 0 : }
133 :
134 0 : async fn heartbeat(
135 0 : &mut self,
136 0 : pageservers: Arc<HashMap<NodeId, Node>>,
137 0 : ) -> Result<AvailablityDeltas, HeartbeaterError> {
138 0 : let mut new_state = HashMap::new();
139 0 :
140 0 : let mut heartbeat_futs = FuturesUnordered::new();
141 0 : for (node_id, node) in &*pageservers {
142 0 : heartbeat_futs.push({
143 0 : let jwt_token = self.jwt_token.clone();
144 0 : let cancel = self.cancel.clone();
145 0 :
146 0 : // Clone the node and mark it as available such that the request
147 0 : // goes through to the pageserver even when the node is marked offline.
148 0 : // This doesn't impact the availability observed by [`crate::service::Service`].
149 0 : let mut node_clone = node.clone();
150 0 : node_clone.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
151 0 :
152 0 : async move {
153 0 : let response = node_clone
154 0 : .with_client_retries(
155 0 : |client| async move { client.get_utilization().await },
156 0 : &jwt_token,
157 0 : 3,
158 0 : 3,
159 0 : Duration::from_secs(1),
160 0 : &cancel,
161 0 : )
162 0 : .await;
163 :
164 0 : let response = match response {
165 0 : Some(r) => r,
166 : None => {
167 : // This indicates cancellation of the request.
168 : // We ignore the node in this case.
169 0 : return None;
170 : }
171 : };
172 :
173 0 : let status = if let Ok(utilization) = response {
174 0 : PageserverState::Available {
175 0 : last_seen_at: Instant::now(),
176 0 : utilization,
177 0 : }
178 0 : } else if let NodeAvailability::WarmingUp(last_seen_at) =
179 0 : node.get_availability()
180 : {
181 0 : PageserverState::WarmingUp {
182 0 : started_at: last_seen_at,
183 0 : }
184 : } else {
185 0 : PageserverState::Offline
186 : };
187 :
188 0 : Some((*node_id, status))
189 0 : }
190 0 : });
191 :
192 : loop {
193 0 : let maybe_status = tokio::select! {
194 : next = heartbeat_futs.next() => {
195 : match next {
196 : Some(result) => result,
197 : None => { break; }
198 : }
199 : },
200 : _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
201 : };
202 :
203 0 : if let Some((node_id, status)) = maybe_status {
204 0 : new_state.insert(node_id, status);
205 0 : }
206 : }
207 : }
208 :
209 0 : let mut warming_up = 0;
210 0 : let mut offline = 0;
211 0 : for state in new_state.values() {
212 0 : match state {
213 0 : PageserverState::WarmingUp { .. } => {
214 0 : warming_up += 1;
215 0 : }
216 0 : PageserverState::Offline { .. } => offline += 1,
217 0 : PageserverState::Available { .. } => {}
218 : }
219 : }
220 :
221 0 : tracing::info!(
222 0 : "Heartbeat round complete for {} nodes, {} warming-up, {} offline",
223 0 : new_state.len(),
224 : warming_up,
225 : offline
226 : );
227 :
228 0 : let mut deltas = Vec::new();
229 0 : let now = Instant::now();
230 0 : for (node_id, ps_state) in new_state.iter_mut() {
231 : use std::collections::hash_map::Entry::*;
232 0 : let entry = self.state.entry(*node_id);
233 0 :
234 0 : let mut needs_update = false;
235 0 : match entry {
236 0 : Occupied(ref occ) => match (occ.get(), &ps_state) {
237 0 : (PageserverState::Offline, PageserverState::Offline) => {}
238 0 : (PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => {
239 0 : if now - *last_seen_at >= self.max_offline_interval {
240 0 : deltas.push((*node_id, ps_state.clone()));
241 0 : needs_update = true;
242 0 : }
243 : }
244 0 : (_, PageserverState::WarmingUp { started_at }) => {
245 0 : if now - *started_at >= self.max_warming_up_interval {
246 0 : *ps_state = PageserverState::Offline;
247 0 : }
248 :
249 0 : deltas.push((*node_id, ps_state.clone()));
250 0 : needs_update = true;
251 : }
252 0 : _ => {
253 0 : deltas.push((*node_id, ps_state.clone()));
254 0 : needs_update = true;
255 0 : }
256 : },
257 0 : Vacant(_) => {
258 0 : // This is a new node. Don't generate a delta for it.
259 0 : deltas.push((*node_id, ps_state.clone()));
260 0 : }
261 : }
262 :
263 0 : match entry {
264 0 : Occupied(mut occ) if needs_update => {
265 0 : (*occ.get_mut()) = ps_state.clone();
266 0 : }
267 0 : Vacant(vac) => {
268 0 : vac.insert(ps_state.clone());
269 0 : }
270 0 : _ => {}
271 : }
272 : }
273 :
274 0 : Ok(AvailablityDeltas(deltas))
275 0 : }
276 : }
|