Line data Source code
1 : use futures::{stream::FuturesUnordered, StreamExt};
2 : use std::{
3 : collections::HashMap,
4 : sync::Arc,
5 : time::{Duration, Instant},
6 : };
7 : use tokio_util::sync::CancellationToken;
8 :
9 : use pageserver_api::{controller_api::NodeAvailability, models::PageserverUtilization};
10 :
11 : use thiserror::Error;
12 : use utils::id::NodeId;
13 :
14 : use crate::node::Node;
15 :
16 : struct HeartbeaterTask {
17 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
18 : cancel: CancellationToken,
19 :
20 : state: HashMap<NodeId, PageserverState>,
21 :
22 : max_offline_interval: Duration,
23 : max_warming_up_interval: Duration,
24 : jwt_token: Option<String>,
25 : }
26 :
27 : #[derive(Debug, Clone)]
28 : pub(crate) enum PageserverState {
29 : Available {
30 : last_seen_at: Instant,
31 : utilization: PageserverUtilization,
32 : },
33 : WarmingUp {
34 : started_at: Instant,
35 : },
36 : Offline,
37 : }
38 :
39 : #[derive(Debug)]
40 : pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, PageserverState)>);
41 :
42 0 : #[derive(Debug, Error)]
43 : pub(crate) enum HeartbeaterError {
44 : #[error("Cancelled")]
45 : Cancel,
46 : }
47 :
48 : struct HeartbeatRequest {
49 : pageservers: Arc<HashMap<NodeId, Node>>,
50 : reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas, HeartbeaterError>>,
51 : }
52 :
53 : pub(crate) struct Heartbeater {
54 : sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest>,
55 : }
56 :
57 : impl Heartbeater {
58 0 : pub(crate) fn new(
59 0 : jwt_token: Option<String>,
60 0 : max_offline_interval: Duration,
61 0 : max_warming_up_interval: Duration,
62 0 : cancel: CancellationToken,
63 0 : ) -> Self {
64 0 : let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest>();
65 0 : let mut heartbeater = HeartbeaterTask::new(
66 0 : receiver,
67 0 : jwt_token,
68 0 : max_offline_interval,
69 0 : max_warming_up_interval,
70 0 : cancel,
71 0 : );
72 0 : tokio::task::spawn(async move { heartbeater.run().await });
73 0 :
74 0 : Self { sender }
75 0 : }
76 :
77 0 : pub(crate) async fn heartbeat(
78 0 : &self,
79 0 : pageservers: Arc<HashMap<NodeId, Node>>,
80 0 : ) -> Result<AvailablityDeltas, HeartbeaterError> {
81 0 : let (sender, receiver) = tokio::sync::oneshot::channel();
82 0 : self.sender
83 0 : .send(HeartbeatRequest {
84 0 : pageservers,
85 0 : reply: sender,
86 0 : })
87 0 : .map_err(|_| HeartbeaterError::Cancel)?;
88 :
89 0 : receiver
90 0 : .await
91 0 : .map_err(|_| HeartbeaterError::Cancel)
92 0 : .and_then(|x| x)
93 0 : }
94 : }
95 :
96 : impl HeartbeaterTask {
97 0 : fn new(
98 0 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
99 0 : jwt_token: Option<String>,
100 0 : max_offline_interval: Duration,
101 0 : max_warming_up_interval: Duration,
102 0 : cancel: CancellationToken,
103 0 : ) -> Self {
104 0 : Self {
105 0 : receiver,
106 0 : cancel,
107 0 : state: HashMap::new(),
108 0 : max_offline_interval,
109 0 : max_warming_up_interval,
110 0 : jwt_token,
111 0 : }
112 0 : }
113 :
114 0 : async fn run(&mut self) {
115 : loop {
116 0 : tokio::select! {
117 0 : request = self.receiver.recv() => {
118 0 : match request {
119 0 : Some(req) => {
120 0 : let res = self.heartbeat(req.pageservers).await;
121 0 : req.reply.send(res).unwrap();
122 : },
123 0 : None => { return; }
124 : }
125 : },
126 0 : _ = self.cancel.cancelled() => return
127 : }
128 : }
129 0 : }
130 :
131 0 : async fn heartbeat(
132 0 : &mut self,
133 0 : pageservers: Arc<HashMap<NodeId, Node>>,
134 0 : ) -> Result<AvailablityDeltas, HeartbeaterError> {
135 0 : let mut new_state = HashMap::new();
136 0 :
137 0 : let mut heartbeat_futs = FuturesUnordered::new();
138 0 : for (node_id, node) in &*pageservers {
139 0 : heartbeat_futs.push({
140 0 : let jwt_token = self.jwt_token.clone();
141 0 : let cancel = self.cancel.clone();
142 0 :
143 0 : // Clone the node and mark it as available such that the request
144 0 : // goes through to the pageserver even when the node is marked offline.
145 0 : // This doesn't impact the availability observed by [`crate::service::Service`].
146 0 : let mut node_clone = node.clone();
147 0 : node_clone
148 0 : .set_availability(NodeAvailability::Active(PageserverUtilization::full()));
149 0 :
150 0 : async move {
151 0 : let response = node_clone
152 0 : .with_client_retries(
153 0 : |client| async move { client.get_utilization().await },
154 0 : &jwt_token,
155 0 : 3,
156 0 : 3,
157 0 : Duration::from_secs(1),
158 0 : &cancel,
159 0 : )
160 0 : .await;
161 :
162 0 : let response = match response {
163 0 : Some(r) => r,
164 : None => {
165 : // This indicates cancellation of the request.
166 : // We ignore the node in this case.
167 0 : return None;
168 : }
169 : };
170 :
171 0 : let status = if let Ok(utilization) = response {
172 0 : PageserverState::Available {
173 0 : last_seen_at: Instant::now(),
174 0 : utilization,
175 0 : }
176 0 : } else if let NodeAvailability::WarmingUp(last_seen_at) =
177 0 : node.get_availability()
178 : {
179 0 : PageserverState::WarmingUp {
180 0 : started_at: *last_seen_at,
181 0 : }
182 : } else {
183 0 : PageserverState::Offline
184 : };
185 :
186 0 : Some((*node_id, status))
187 0 : }
188 0 : });
189 :
190 : loop {
191 0 : let maybe_status = tokio::select! {
192 0 : next = heartbeat_futs.next() => {
193 0 : match next {
194 0 : Some(result) => result,
195 0 : None => { break; }
196 : }
197 : },
198 0 : _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
199 : };
200 :
201 0 : if let Some((node_id, status)) = maybe_status {
202 0 : new_state.insert(node_id, status);
203 0 : }
204 : }
205 : }
206 :
207 0 : let mut warming_up = 0;
208 0 : let mut offline = 0;
209 0 : for state in new_state.values() {
210 0 : match state {
211 0 : PageserverState::WarmingUp { .. } => {
212 0 : warming_up += 1;
213 0 : }
214 0 : PageserverState::Offline { .. } => offline += 1,
215 0 : PageserverState::Available { .. } => {}
216 : }
217 : }
218 :
219 0 : tracing::info!(
220 0 : "Heartbeat round complete for {} nodes, {} warming-up, {} offline",
221 0 : new_state.len(),
222 : warming_up,
223 : offline
224 : );
225 :
226 0 : let mut deltas = Vec::new();
227 0 : let now = Instant::now();
228 0 : for (node_id, ps_state) in new_state.iter_mut() {
229 : use std::collections::hash_map::Entry::*;
230 0 : let entry = self.state.entry(*node_id);
231 0 :
232 0 : let mut needs_update = false;
233 0 : match entry {
234 0 : Occupied(ref occ) => match (occ.get(), &ps_state) {
235 0 : (PageserverState::Offline, PageserverState::Offline) => {}
236 0 : (PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => {
237 0 : if now - *last_seen_at >= self.max_offline_interval {
238 0 : deltas.push((*node_id, ps_state.clone()));
239 0 : needs_update = true;
240 0 : }
241 : }
242 0 : (_, PageserverState::WarmingUp { started_at }) => {
243 0 : if now - *started_at >= self.max_warming_up_interval {
244 0 : *ps_state = PageserverState::Offline;
245 0 : }
246 :
247 0 : deltas.push((*node_id, ps_state.clone()));
248 0 : needs_update = true;
249 : }
250 0 : _ => {
251 0 : deltas.push((*node_id, ps_state.clone()));
252 0 : needs_update = true;
253 0 : }
254 : },
255 0 : Vacant(_) => {
256 0 : // This is a new node. Don't generate a delta for it.
257 0 : deltas.push((*node_id, ps_state.clone()));
258 0 : }
259 : }
260 :
261 0 : match entry {
262 0 : Occupied(mut occ) if needs_update => {
263 0 : (*occ.get_mut()) = ps_state.clone();
264 0 : }
265 0 : Vacant(vac) => {
266 0 : vac.insert(ps_state.clone());
267 0 : }
268 0 : _ => {}
269 : }
270 : }
271 :
272 0 : Ok(AvailablityDeltas(deltas))
273 0 : }
274 : }
|