Line data Source code
1 : use futures::{stream::FuturesUnordered, StreamExt};
2 : use std::{
3 : collections::HashMap,
4 : sync::Arc,
5 : time::{Duration, Instant},
6 : };
7 : use tokio_util::sync::CancellationToken;
8 :
9 : use pageserver_api::{
10 : controller_api::{NodeAvailability, UtilizationScore},
11 : models::PageserverUtilization,
12 : };
13 :
14 : use thiserror::Error;
15 : use utils::id::NodeId;
16 :
17 : use crate::node::Node;
18 :
19 : struct HeartbeaterTask {
20 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
21 : cancel: CancellationToken,
22 :
23 : state: HashMap<NodeId, PageserverState>,
24 :
25 : max_unavailable_interval: Duration,
26 : jwt_token: Option<String>,
27 : }
28 :
29 : #[derive(Debug, Clone)]
30 : pub(crate) enum PageserverState {
31 : Available {
32 : last_seen_at: Instant,
33 : utilization: PageserverUtilization,
34 : new: bool,
35 : },
36 : Offline,
37 : }
38 :
39 : #[derive(Debug)]
40 : pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, PageserverState)>);
41 :
42 0 : #[derive(Debug, Error)]
43 : pub(crate) enum HeartbeaterError {
44 : #[error("Cancelled")]
45 : Cancel,
46 : }
47 :
48 : struct HeartbeatRequest {
49 : pageservers: Arc<HashMap<NodeId, Node>>,
50 : reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas, HeartbeaterError>>,
51 : }
52 :
53 : pub(crate) struct Heartbeater {
54 : sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest>,
55 : }
56 :
57 : impl Heartbeater {
58 0 : pub(crate) fn new(
59 0 : jwt_token: Option<String>,
60 0 : max_unavailable_interval: Duration,
61 0 : cancel: CancellationToken,
62 0 : ) -> Self {
63 0 : let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest>();
64 0 : let mut heartbeater =
65 0 : HeartbeaterTask::new(receiver, jwt_token, max_unavailable_interval, cancel);
66 0 : tokio::task::spawn(async move { heartbeater.run().await });
67 0 :
68 0 : Self { sender }
69 0 : }
70 :
71 0 : pub(crate) async fn heartbeat(
72 0 : &self,
73 0 : pageservers: Arc<HashMap<NodeId, Node>>,
74 0 : ) -> Result<AvailablityDeltas, HeartbeaterError> {
75 0 : let (sender, receiver) = tokio::sync::oneshot::channel();
76 0 : self.sender
77 0 : .send(HeartbeatRequest {
78 0 : pageservers,
79 0 : reply: sender,
80 0 : })
81 0 : .unwrap();
82 0 :
83 0 : receiver.await.unwrap()
84 0 : }
85 : }
86 :
87 : impl HeartbeaterTask {
88 0 : fn new(
89 0 : receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
90 0 : jwt_token: Option<String>,
91 0 : max_unavailable_interval: Duration,
92 0 : cancel: CancellationToken,
93 0 : ) -> Self {
94 0 : Self {
95 0 : receiver,
96 0 : cancel,
97 0 : state: HashMap::new(),
98 0 : max_unavailable_interval,
99 0 : jwt_token,
100 0 : }
101 0 : }
102 :
103 0 : async fn run(&mut self) {
104 0 : loop {
105 0 : tokio::select! {
106 : request = self.receiver.recv() => {
107 : match request {
108 : Some(req) => {
109 : let res = self.heartbeat(req.pageservers).await;
110 : req.reply.send(res).unwrap();
111 : },
112 : None => { return; }
113 : }
114 : },
115 : _ = self.cancel.cancelled() => return
116 0 : }
117 0 : }
118 0 : }
119 :
120 0 : async fn heartbeat(
121 0 : &mut self,
122 0 : pageservers: Arc<HashMap<NodeId, Node>>,
123 0 : ) -> Result<AvailablityDeltas, HeartbeaterError> {
124 0 : let mut new_state = HashMap::new();
125 0 :
126 0 : let mut heartbeat_futs = FuturesUnordered::new();
127 0 : for (node_id, node) in &*pageservers {
128 0 : heartbeat_futs.push({
129 0 : let jwt_token = self.jwt_token.clone();
130 0 : let cancel = self.cancel.clone();
131 0 : let new_node = !self.state.contains_key(node_id);
132 0 :
133 0 : // Clone the node and mark it as available such that the request
134 0 : // goes through to the pageserver even when the node is marked offline.
135 0 : // This doesn't impact the availability observed by [`crate::service::Service`].
136 0 : let mut node = node.clone();
137 0 : node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
138 0 :
139 0 : async move {
140 0 : let response = node
141 0 : .with_client_retries(
142 0 : |client| async move { client.get_utilization().await },
143 0 : &jwt_token,
144 0 : 3,
145 0 : 3,
146 0 : Duration::from_secs(1),
147 0 : &cancel,
148 0 : )
149 0 : .await;
150 :
151 0 : let response = match response {
152 0 : Some(r) => r,
153 : None => {
154 : // This indicates cancellation of the request.
155 : // We ignore the node in this case.
156 0 : return None;
157 : }
158 : };
159 :
160 0 : let status = if let Ok(utilization) = response {
161 0 : PageserverState::Available {
162 0 : last_seen_at: Instant::now(),
163 0 : utilization,
164 0 : new: new_node,
165 0 : }
166 : } else {
167 0 : PageserverState::Offline
168 : };
169 :
170 0 : Some((*node_id, status))
171 0 : }
172 0 : });
173 :
174 : loop {
175 0 : let maybe_status = tokio::select! {
176 : next = heartbeat_futs.next() => {
177 : match next {
178 : Some(result) => result,
179 : None => { break; }
180 : }
181 : },
182 : _ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
183 : };
184 :
185 0 : if let Some((node_id, status)) = maybe_status {
186 0 : new_state.insert(node_id, status);
187 0 : }
188 : }
189 : }
190 0 : tracing::info!(
191 0 : "Heartbeat round complete for {} nodes, {} offline",
192 0 : new_state.len(),
193 0 : new_state
194 0 : .values()
195 0 : .filter(|s| match s {
196 : PageserverState::Available { .. } => {
197 0 : false
198 : }
199 0 : PageserverState::Offline => true,
200 0 : })
201 0 : .count()
202 : );
203 :
204 0 : let mut deltas = Vec::new();
205 0 : let now = Instant::now();
206 0 : for (node_id, ps_state) in new_state {
207 : use std::collections::hash_map::Entry::*;
208 0 : let entry = self.state.entry(node_id);
209 0 :
210 0 : let mut needs_update = false;
211 0 : match entry {
212 0 : Occupied(ref occ) => match (occ.get(), &ps_state) {
213 0 : (PageserverState::Offline, PageserverState::Offline) => {}
214 0 : (PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => {
215 0 : if now - *last_seen_at >= self.max_unavailable_interval {
216 0 : deltas.push((node_id, ps_state.clone()));
217 0 : needs_update = true;
218 0 : }
219 : }
220 0 : _ => {
221 0 : deltas.push((node_id, ps_state.clone()));
222 0 : needs_update = true;
223 0 : }
224 : },
225 0 : Vacant(_) => {
226 0 : // This is a new node. Don't generate a delta for it.
227 0 : deltas.push((node_id, ps_state.clone()));
228 0 : }
229 : }
230 :
231 0 : match entry {
232 0 : Occupied(mut occ) if needs_update => {
233 0 : (*occ.get_mut()) = ps_state;
234 0 : }
235 0 : Vacant(vac) => {
236 0 : vac.insert(ps_state);
237 0 : }
238 0 : _ => {}
239 : }
240 : }
241 :
242 0 : Ok(AvailablityDeltas(deltas))
243 0 : }
244 : }
|