Line data Source code
1 : use futures::{stream::FuturesUnordered, StreamExt};
2 : use std::{
3 : collections::HashMap,
4 : sync::Arc,
5 : time::{Duration, Instant},
6 : };
7 : use tokio_util::sync::CancellationToken;
8 :
9 : use pageserver_api::{
10 : controller_api::{NodeAvailability, UtilizationScore},
11 : models::PageserverUtilization,
12 : };
13 :
14 : use thiserror::Error;
15 : use utils::id::NodeId;
16 :
17 : use crate::node::Node;
18 :
19 : struct HeartbeaterTask {
20 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
21 : cancel: CancellationToken,
22 :
23 : state: HashMap<NodeId, PageserverState>,
24 :
25 : max_offline_interval: Duration,
26 : max_warming_up_interval: Duration,
27 : jwt_token: Option<String>,
28 : }
29 :
30 : #[derive(Debug, Clone)]
31 : pub(crate) enum PageserverState {
32 : Available {
33 : last_seen_at: Instant,
34 : utilization: PageserverUtilization,
35 : },
36 : WarmingUp {
37 : started_at: Instant,
38 : },
39 : Offline,
40 : }
41 :
42 : #[derive(Debug)]
43 : pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, PageserverState)>);
44 :
45 0 : #[derive(Debug, Error)]
46 : pub(crate) enum HeartbeaterError {
47 : #[error("Cancelled")]
48 : Cancel,
49 : }
50 :
51 : struct HeartbeatRequest {
52 : pageservers: Arc<HashMap<NodeId, Node>>,
53 : reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas, HeartbeaterError>>,
54 : }
55 :
56 : pub(crate) struct Heartbeater {
57 : sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest>,
58 : }
59 :
60 : impl Heartbeater {
61 0 : pub(crate) fn new(
62 0 : jwt_token: Option<String>,
63 0 : max_offline_interval: Duration,
64 0 : max_warming_up_interval: Duration,
65 0 : cancel: CancellationToken,
66 0 : ) -> Self {
67 0 : let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest>();
68 0 : let mut heartbeater = HeartbeaterTask::new(
69 0 : receiver,
70 0 : jwt_token,
71 0 : max_offline_interval,
72 0 : max_warming_up_interval,
73 0 : cancel,
74 0 : );
75 0 : tokio::task::spawn(async move { heartbeater.run().await });
76 0 :
77 0 : Self { sender }
78 0 : }
79 :
80 0 : pub(crate) async fn heartbeat(
81 0 : &self,
82 0 : pageservers: Arc<HashMap<NodeId, Node>>,
83 0 : ) -> Result<AvailablityDeltas, HeartbeaterError> {
84 0 : let (sender, receiver) = tokio::sync::oneshot::channel();
85 0 : self.sender
86 0 : .send(HeartbeatRequest {
87 0 : pageservers,
88 0 : reply: sender,
89 0 : })
90 0 : .unwrap();
91 0 :
92 0 : receiver.await.unwrap()
93 0 : }
94 : }
95 :
96 : impl HeartbeaterTask {
97 0 : fn new(
98 0 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
99 0 : jwt_token: Option<String>,
100 0 : max_offline_interval: Duration,
101 0 : max_warming_up_interval: Duration,
102 0 : cancel: CancellationToken,
103 0 : ) -> Self {
104 0 : Self {
105 0 : receiver,
106 0 : cancel,
107 0 : state: HashMap::new(),
108 0 : max_offline_interval,
109 0 : max_warming_up_interval,
110 0 : jwt_token,
111 0 : }
112 0 : }
113 :
114 0 : async fn run(&mut self) {
115 0 : loop {
116 0 : tokio::select! {
117 : request = self.receiver.recv() => {
118 : match request {
119 : Some(req) => {
120 : let res = self.heartbeat(req.pageservers).await;
121 : req.reply.send(res).unwrap();
122 : },
123 : None => { return; }
124 : }
125 : },
126 : _ = self.cancel.cancelled() => return
127 0 : }
128 0 : }
129 0 : }
130 :
131 0 : async fn heartbeat(
132 0 : &mut self,
133 0 : pageservers: Arc<HashMap<NodeId, Node>>,
134 0 : ) -> Result<AvailablityDeltas, HeartbeaterError> {
135 0 : let mut new_state = HashMap::new();
136 0 :
137 0 : let mut heartbeat_futs = FuturesUnordered::new();
138 0 : for (node_id, node) in &*pageservers {
139 0 : heartbeat_futs.push({
140 0 : let jwt_token = self.jwt_token.clone();
141 0 : let cancel = self.cancel.clone();
142 0 :
143 0 : // Clone the node and mark it as available such that the request
144 0 : // goes through to the pageserver even when the node is marked offline.
145 0 : // This doesn't impact the availability observed by [`crate::service::Service`].
146 0 : let mut node_clone = node.clone();
147 0 : node_clone.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
148 0 :
149 0 : async move {
150 0 : let response = node_clone
151 0 : .with_client_retries(
152 0 : |client| async move { client.get_utilization().await },
153 0 : &jwt_token,
154 0 : 3,
155 0 : 3,
156 0 : Duration::from_secs(1),
157 0 : &cancel,
158 0 : )
159 0 : .await;
160 :
161 0 : let response = match response {
162 0 : Some(r) => r,
163 : None => {
164 : // This indicates cancellation of the request.
165 : // We ignore the node in this case.
166 0 : return None;
167 : }
168 : };
169 :
170 0 : let status = if let Ok(utilization) = response {
171 0 : PageserverState::Available {
172 0 : last_seen_at: Instant::now(),
173 0 : utilization,
174 0 : }
175 0 : } else if let NodeAvailability::WarmingUp(last_seen_at) =
176 0 : node.get_availability()
177 : {
178 0 : PageserverState::WarmingUp {
179 0 : started_at: last_seen_at,
180 0 : }
181 : } else {
182 0 : PageserverState::Offline
183 : };
184 :
185 0 : Some((*node_id, status))
186 0 : }
187 0 : });
188 :
189 : loop {
190 0 : let maybe_status = tokio::select! {
191 : next = heartbeat_futs.next() => {
192 : match next {
193 : Some(result) => result,
194 : None => { break; }
195 : }
196 : },
197 : _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
198 : };
199 :
200 0 : if let Some((node_id, status)) = maybe_status {
201 0 : new_state.insert(node_id, status);
202 0 : }
203 : }
204 : }
205 :
206 0 : let mut warming_up = 0;
207 0 : let mut offline = 0;
208 0 : for state in new_state.values() {
209 0 : match state {
210 0 : PageserverState::WarmingUp { .. } => {
211 0 : warming_up += 1;
212 0 : }
213 0 : PageserverState::Offline { .. } => offline += 1,
214 0 : PageserverState::Available { .. } => {}
215 : }
216 : }
217 :
218 0 : tracing::info!(
219 0 : "Heartbeat round complete for {} nodes, {} warming-up, {} offline",
220 0 : new_state.len(),
221 : warming_up,
222 : offline
223 : );
224 :
225 0 : let mut deltas = Vec::new();
226 0 : let now = Instant::now();
227 0 : for (node_id, ps_state) in new_state.iter_mut() {
228 : use std::collections::hash_map::Entry::*;
229 0 : let entry = self.state.entry(*node_id);
230 0 :
231 0 : let mut needs_update = false;
232 0 : match entry {
233 0 : Occupied(ref occ) => match (occ.get(), &ps_state) {
234 0 : (PageserverState::Offline, PageserverState::Offline) => {}
235 0 : (PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => {
236 0 : if now - *last_seen_at >= self.max_offline_interval {
237 0 : deltas.push((*node_id, ps_state.clone()));
238 0 : needs_update = true;
239 0 : }
240 : }
241 0 : (_, PageserverState::WarmingUp { started_at }) => {
242 0 : if now - *started_at >= self.max_warming_up_interval {
243 0 : *ps_state = PageserverState::Offline;
244 0 : }
245 :
246 0 : deltas.push((*node_id, ps_state.clone()));
247 0 : needs_update = true;
248 : }
249 0 : _ => {
250 0 : deltas.push((*node_id, ps_state.clone()));
251 0 : needs_update = true;
252 0 : }
253 : },
254 0 : Vacant(_) => {
255 0 : // This is a new node. Don't generate a delta for it.
256 0 : deltas.push((*node_id, ps_state.clone()));
257 0 : }
258 : }
259 :
260 0 : match entry {
261 0 : Occupied(mut occ) if needs_update => {
262 0 : (*occ.get_mut()) = ps_state.clone();
263 0 : }
264 0 : Vacant(vac) => {
265 0 : vac.insert(ps_state.clone());
266 0 : }
267 0 : _ => {}
268 : }
269 : }
270 :
271 0 : Ok(AvailablityDeltas(deltas))
272 0 : }
273 : }
|