Line data Source code
1 : use std::{str::FromStr, time::Duration};
2 :
3 : use pageserver_api::{
4 : controller_api::{
5 : NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
6 : TenantLocateResponseShard,
7 : },
8 : shard::TenantShardId,
9 : };
10 : use pageserver_client::mgmt_api;
11 : use reqwest::StatusCode;
12 : use serde::Serialize;
13 : use tokio_util::sync::CancellationToken;
14 : use utils::{backoff, id::NodeId};
15 :
16 : use crate::{
17 : pageserver_client::PageserverClient, persistence::NodePersistence, scheduler::MaySchedule,
18 : };
19 :
20 : /// Represents the in-memory description of a Node.
21 : ///
22 : /// Scheduling statistics are maintened separately in [`crate::scheduler`].
23 : ///
24 : /// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
25 : /// implementation of serialization on this type is only for debug dumps.
26 : #[derive(Clone, Serialize)]
27 : pub(crate) struct Node {
28 : id: NodeId,
29 :
30 : availability: NodeAvailability,
31 : scheduling: NodeSchedulingPolicy,
32 :
33 : listen_http_addr: String,
34 : listen_http_port: u16,
35 :
36 : listen_pg_addr: String,
37 : listen_pg_port: u16,
38 :
39 : availability_zone_id: Option<String>,
40 :
41 : // This cancellation token means "stop any RPCs in flight to this node, and don't start
42 : // any more". It is not related to process shutdown.
43 : #[serde(skip)]
44 : cancel: CancellationToken,
45 : }
46 :
47 : /// When updating [`Node::availability`] we use this type to indicate to the caller
48 : /// whether/how they changed it.
49 : pub(crate) enum AvailabilityTransition {
50 : ToActive,
51 : ToWarmingUpFromActive,
52 : ToWarmingUpFromOffline,
53 : ToOffline,
54 : Unchanged,
55 : }
56 :
57 : impl Node {
58 0 : pub(crate) fn base_url(&self) -> String {
59 0 : format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
60 0 : }
61 :
62 31 : pub(crate) fn get_id(&self) -> NodeId {
63 31 : self.id
64 31 : }
65 :
66 0 : pub(crate) fn get_availability_zone_id(&self) -> Option<&str> {
67 0 : self.availability_zone_id.as_deref()
68 0 : }
69 :
70 30 : pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
71 30 : self.scheduling
72 30 : }
73 :
74 0 : pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
75 0 : self.scheduling = scheduling
76 0 : }
77 :
78 : /// Does this registration request match `self`? This is used when deciding whether a registration
79 : /// request should be allowed to update an existing record with the same node ID.
80 0 : pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
81 0 : let az_ids_match = {
82 : match (
83 0 : self.availability_zone_id.as_deref(),
84 0 : register_req.availability_zone_id.as_deref(),
85 : ) {
86 0 : (Some(current_az), Some(register_req_az)) => current_az == register_req_az,
87 0 : _ => true,
88 : }
89 : };
90 :
91 0 : az_ids_match
92 0 : && self.id == register_req.node_id
93 0 : && self.listen_http_addr == register_req.listen_http_addr
94 0 : && self.listen_http_port == register_req.listen_http_port
95 0 : && self.listen_pg_addr == register_req.listen_pg_addr
96 0 : && self.listen_pg_port == register_req.listen_pg_port
97 0 : }
98 :
99 : /// For a shard located on this node, populate a response object
100 : /// with this node's address information.
101 0 : pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard {
102 0 : TenantLocateResponseShard {
103 0 : shard_id,
104 0 : node_id: self.id,
105 0 : listen_http_addr: self.listen_http_addr.clone(),
106 0 : listen_http_port: self.listen_http_port,
107 0 : listen_pg_addr: self.listen_pg_addr.clone(),
108 0 : listen_pg_port: self.listen_pg_port,
109 0 : }
110 0 : }
111 :
112 0 : pub(crate) fn get_availability(&self) -> &NodeAvailability {
113 0 : &self.availability
114 0 : }
115 :
116 28 : pub(crate) fn set_availability(&mut self, availability: NodeAvailability) {
117 28 : use AvailabilityTransition::*;
118 28 : use NodeAvailability::WarmingUp;
119 28 :
120 28 : match self.get_availability_transition(&availability) {
121 25 : ToActive => {
122 25 : // Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
123 25 : // users of previously-cloned copies of the node will still see the old cancellation
124 25 : // state. For example, Reconcilers in flight will have to complete and be spawned
125 25 : // again to realize that the node has become available.
126 25 : self.cancel = CancellationToken::new();
127 25 : }
128 1 : ToOffline | ToWarmingUpFromActive => {
129 1 : // Fire the node's cancellation token to cancel any in-flight API requests to it
130 1 : self.cancel.cancel();
131 1 : }
132 2 : Unchanged | ToWarmingUpFromOffline => {}
133 : }
134 :
135 28 : if let (WarmingUp(crnt), WarmingUp(proposed)) = (&self.availability, &availability) {
136 0 : self.availability = WarmingUp(std::cmp::max(*crnt, *proposed));
137 28 : } else {
138 28 : self.availability = availability;
139 28 : }
140 28 : }
141 :
142 : /// Without modifying the availability of the node, convert the intended availability
143 : /// into a description of the transition.
144 28 : pub(crate) fn get_availability_transition(
145 28 : &self,
146 28 : availability: &NodeAvailability,
147 28 : ) -> AvailabilityTransition {
148 28 : use AvailabilityTransition::*;
149 28 : use NodeAvailability::*;
150 28 :
151 28 : match (&self.availability, availability) {
152 25 : (Offline, Active(_)) => ToActive,
153 1 : (Active(_), Offline) => ToOffline,
154 0 : (Active(_), WarmingUp(_)) => ToWarmingUpFromActive,
155 0 : (WarmingUp(_), Offline) => ToOffline,
156 0 : (WarmingUp(_), Active(_)) => ToActive,
157 0 : (Offline, WarmingUp(_)) => ToWarmingUpFromOffline,
158 2 : _ => Unchanged,
159 : }
160 28 : }
161 :
162 : /// Whether we may send API requests to this node.
163 25 : pub(crate) fn is_available(&self) -> bool {
164 : // When we clone a node, [`Self::availability`] is a snapshot, but [`Self::cancel`] holds
165 : // a reference to the original Node's cancellation status. Checking both of these results
166 : // in a "pessimistic" check where we will consider a Node instance unavailable if it was unavailable
167 : // when we cloned it, or if the original Node instance's cancellation token was fired.
168 25 : matches!(self.availability, NodeAvailability::Active(_)) && !self.cancel.is_cancelled()
169 25 : }
170 :
171 : /// Is this node elegible to have work scheduled onto it?
172 61 : pub(crate) fn may_schedule(&self) -> MaySchedule {
173 61 : let utilization = match &self.availability {
174 60 : NodeAvailability::Active(u) => u.clone(),
175 1 : NodeAvailability::Offline | NodeAvailability::WarmingUp(_) => return MaySchedule::No,
176 : };
177 :
178 60 : match self.scheduling {
179 60 : NodeSchedulingPolicy::Active => MaySchedule::Yes(utilization),
180 0 : NodeSchedulingPolicy::Draining => MaySchedule::No,
181 0 : NodeSchedulingPolicy::Filling => MaySchedule::Yes(utilization),
182 0 : NodeSchedulingPolicy::Pause => MaySchedule::No,
183 0 : NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
184 : }
185 61 : }
186 :
187 25 : pub(crate) fn new(
188 25 : id: NodeId,
189 25 : listen_http_addr: String,
190 25 : listen_http_port: u16,
191 25 : listen_pg_addr: String,
192 25 : listen_pg_port: u16,
193 25 : availability_zone_id: Option<String>,
194 25 : ) -> Self {
195 25 : Self {
196 25 : id,
197 25 : listen_http_addr,
198 25 : listen_http_port,
199 25 : listen_pg_addr,
200 25 : listen_pg_port,
201 25 : scheduling: NodeSchedulingPolicy::Active,
202 25 : availability: NodeAvailability::Offline,
203 25 : availability_zone_id,
204 25 : cancel: CancellationToken::new(),
205 25 : }
206 25 : }
207 :
208 0 : pub(crate) fn to_persistent(&self) -> NodePersistence {
209 0 : NodePersistence {
210 0 : node_id: self.id.0 as i64,
211 0 : scheduling_policy: self.scheduling.into(),
212 0 : listen_http_addr: self.listen_http_addr.clone(),
213 0 : listen_http_port: self.listen_http_port as i32,
214 0 : listen_pg_addr: self.listen_pg_addr.clone(),
215 0 : listen_pg_port: self.listen_pg_port as i32,
216 0 : availability_zone_id: self.availability_zone_id.clone(),
217 0 : }
218 0 : }
219 :
220 0 : pub(crate) fn from_persistent(np: NodePersistence) -> Self {
221 0 : Self {
222 0 : id: NodeId(np.node_id as u64),
223 0 : // At startup we consider a node offline until proven otherwise.
224 0 : availability: NodeAvailability::Offline,
225 0 : scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
226 0 : .expect("Bad scheduling policy in DB"),
227 0 : listen_http_addr: np.listen_http_addr,
228 0 : listen_http_port: np.listen_http_port as u16,
229 0 : listen_pg_addr: np.listen_pg_addr,
230 0 : listen_pg_port: np.listen_pg_port as u16,
231 0 : availability_zone_id: np.availability_zone_id,
232 0 : cancel: CancellationToken::new(),
233 0 : }
234 0 : }
235 :
236 : /// Wrapper for issuing requests to pageserver management API: takes care of generic
237 : /// retry/backoff for retryable HTTP status codes.
238 : ///
239 : /// This will return None to indicate cancellation. Cancellation may happen from
240 : /// the cancellation token passed in, or from Self's cancellation token (i.e. node
241 : /// going offline).
242 0 : pub(crate) async fn with_client_retries<T, O, F>(
243 0 : &self,
244 0 : mut op: O,
245 0 : jwt: &Option<String>,
246 0 : warn_threshold: u32,
247 0 : max_retries: u32,
248 0 : timeout: Duration,
249 0 : cancel: &CancellationToken,
250 0 : ) -> Option<mgmt_api::Result<T>>
251 0 : where
252 0 : O: FnMut(PageserverClient) -> F,
253 0 : F: std::future::Future<Output = mgmt_api::Result<T>>,
254 0 : {
255 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
256 0 : use mgmt_api::Error::*;
257 0 : match e {
258 0 : SendRequest(_) | ReceiveBody(_) | ReceiveErrorBody(_) => false,
259 0 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
260 0 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
261 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
262 0 : ApiError(_, _) => true,
263 0 : Cancelled => true,
264 0 : }
265 0 : }
266 0 :
267 0 : backoff::retry(
268 0 : || {
269 0 : let http_client = reqwest::ClientBuilder::new()
270 0 : .timeout(timeout)
271 0 : .build()
272 0 : .expect("Failed to construct HTTP client");
273 0 :
274 0 : let client = PageserverClient::from_client(
275 0 : self.get_id(),
276 0 : http_client,
277 0 : self.base_url(),
278 0 : jwt.as_deref(),
279 0 : );
280 0 :
281 0 : let node_cancel_fut = self.cancel.cancelled();
282 0 :
283 0 : let op_fut = op(client);
284 :
285 0 : async {
286 : tokio::select! {
287 : r = op_fut=> {r},
288 : _ = node_cancel_fut => {
289 : Err(mgmt_api::Error::Cancelled)
290 : }}
291 0 : }
292 0 : },
293 0 : is_fatal,
294 0 : warn_threshold,
295 0 : max_retries,
296 0 : &format!(
297 0 : "Call to node {} ({}:{}) management API",
298 0 : self.id, self.listen_http_addr, self.listen_http_port
299 0 : ),
300 0 : cancel,
301 0 : )
302 0 : .await
303 0 : }
304 :
305 : /// Generate the simplified API-friendly description of a node's state
306 0 : pub(crate) fn describe(&self) -> NodeDescribeResponse {
307 0 : NodeDescribeResponse {
308 0 : id: self.id,
309 0 : availability: self.availability.clone().into(),
310 0 : scheduling: self.scheduling,
311 0 : listen_http_addr: self.listen_http_addr.clone(),
312 0 : listen_http_port: self.listen_http_port,
313 0 : listen_pg_addr: self.listen_pg_addr.clone(),
314 0 : listen_pg_port: self.listen_pg_port,
315 0 : }
316 0 : }
317 : }
318 :
319 : impl std::fmt::Display for Node {
320 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
322 0 : }
323 : }
324 :
325 : impl std::fmt::Debug for Node {
326 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
328 0 : }
329 : }
|