Line data Source code
1 : use std::str::FromStr;
2 : use std::time::Duration;
3 :
4 : use pageserver_api::controller_api::{
5 : AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeLifecycle, NodeRegisterRequest,
6 : NodeSchedulingPolicy, TenantLocateResponseShard,
7 : };
8 : use pageserver_api::shard::TenantShardId;
9 : use pageserver_client::mgmt_api;
10 : use reqwest::StatusCode;
11 : use serde::Serialize;
12 : use tokio_util::sync::CancellationToken;
13 : use utils::backoff;
14 : use utils::id::NodeId;
15 :
16 : use crate::pageserver_client::PageserverClient;
17 : use crate::persistence::NodePersistence;
18 : use crate::scheduler::MaySchedule;
19 :
20 : /// Represents the in-memory description of a Node.
21 : ///
22 : /// Scheduling statistics are maintened separately in [`crate::scheduler`].
23 : ///
24 : /// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
25 : /// implementation of serialization on this type is only for debug dumps.
26 : #[derive(Clone, Serialize)]
27 : pub(crate) struct Node {
28 : id: NodeId,
29 :
30 : availability: NodeAvailability,
31 : scheduling: NodeSchedulingPolicy,
32 : lifecycle: NodeLifecycle,
33 :
34 : listen_http_addr: String,
35 : listen_http_port: u16,
36 : listen_https_port: Option<u16>,
37 :
38 : listen_pg_addr: String,
39 : listen_pg_port: u16,
40 : listen_grpc_addr: Option<String>,
41 : listen_grpc_port: Option<u16>,
42 :
43 : availability_zone_id: AvailabilityZone,
44 :
45 : // Flag from storcon's config to use https for pageserver admin API.
46 : // Invariant: if |true|, listen_https_port should contain a value.
47 : use_https: bool,
48 : // This cancellation token means "stop any RPCs in flight to this node, and don't start
49 : // any more". It is not related to process shutdown.
50 : #[serde(skip)]
51 : cancel: CancellationToken,
52 : }
53 :
54 : #[allow(dead_code)]
55 : const ONE_MILLION: i64 = 1000000;
56 :
57 : // Converts a pool ID to a large number that can be used to assign unique IDs to pods in StatefulSets.
58 : /// For example, if pool_id is 1, then the pods have NodeIds 1000000, 1000001, 1000002, etc.
59 : /// If pool_id is None, then the pods have NodeIds 0, 1, 2, etc.
60 : #[allow(dead_code)]
61 2 : pub fn transform_pool_id(pool_id: Option<i32>) -> i64 {
62 2 : match pool_id {
63 2 : Some(id) => (id as i64) * ONE_MILLION,
64 0 : None => 0,
65 : }
66 2 : }
67 :
68 : #[allow(dead_code)]
69 0 : pub fn get_pool_id_from_node_id(node_id: i64) -> i32 {
70 0 : (node_id / ONE_MILLION) as i32
71 0 : }
72 :
73 : /// Example pod name: page-server-0-1, safe-keeper-1-0
74 : #[allow(dead_code)]
75 3 : pub fn get_node_id_from_pod_name(pod_name: &str) -> anyhow::Result<NodeId> {
76 3 : let parts: Vec<&str> = pod_name.split('-').collect();
77 3 : if parts.len() != 4 {
78 1 : return Err(anyhow::anyhow!("Invalid pod name: {}", pod_name));
79 2 : }
80 2 : let pool_id = parts[2].parse::<i32>()?;
81 2 : let node_offset = parts[3].parse::<i64>()?;
82 2 : let node_id = transform_pool_id(Some(pool_id)) + node_offset;
83 :
84 2 : Ok(NodeId(node_id as u64))
85 3 : }
86 :
87 : /// When updating [`Node::availability`] we use this type to indicate to the caller
88 : /// whether/how they changed it.
89 : pub(crate) enum AvailabilityTransition {
90 : ToActive,
91 : ToWarmingUpFromActive,
92 : ToWarmingUpFromOffline,
93 : ToOffline,
94 : Unchanged,
95 : }
96 :
97 : impl Node {
98 0 : pub(crate) fn base_url(&self) -> String {
99 0 : if self.use_https {
100 0 : format!(
101 0 : "https://{}:{}",
102 : self.listen_http_addr,
103 0 : self.listen_https_port
104 0 : .expect("https port should be specified if use_https is on")
105 : )
106 : } else {
107 0 : format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
108 : }
109 0 : }
110 :
111 285 : pub(crate) fn get_id(&self) -> NodeId {
112 285 : self.id
113 285 : }
114 :
115 : #[allow(unused)]
116 25481 : pub(crate) fn get_availability_zone_id(&self) -> &AvailabilityZone {
117 25481 : &self.availability_zone_id
118 25481 : }
119 :
120 0 : pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
121 0 : self.scheduling
122 0 : }
123 :
124 0 : pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
125 0 : self.scheduling = scheduling
126 0 : }
127 :
128 0 : pub(crate) fn has_https_port(&self) -> bool {
129 0 : self.listen_https_port.is_some()
130 0 : }
131 :
132 : /// Does this registration request match `self`? This is used when deciding whether a registration
133 : /// request should be allowed to update an existing record with the same node ID.
134 0 : pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
135 0 : self.id == register_req.node_id
136 0 : && self.listen_http_addr == register_req.listen_http_addr
137 0 : && self.listen_http_port == register_req.listen_http_port
138 : // Note: HTTPS and gRPC addresses may change, to allow for migrations. See
139 : // [`Self::need_update`] for more details.
140 0 : && self.listen_pg_addr == register_req.listen_pg_addr
141 0 : && self.listen_pg_port == register_req.listen_pg_port
142 0 : && self.availability_zone_id == register_req.availability_zone_id
143 0 : }
144 :
145 : // Do we need to update an existing record in DB on this registration request?
146 0 : pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool {
147 : // These are checked here, since they may change before we're fully migrated.
148 0 : self.listen_https_port != register_req.listen_https_port
149 0 : || self.listen_grpc_addr != register_req.listen_grpc_addr
150 0 : || self.listen_grpc_port != register_req.listen_grpc_port
151 0 : }
152 :
153 : /// For a shard located on this node, populate a response object
154 : /// with this node's address information.
155 0 : pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard {
156 0 : TenantLocateResponseShard {
157 0 : shard_id,
158 0 : node_id: self.id,
159 0 : listen_http_addr: self.listen_http_addr.clone(),
160 0 : listen_http_port: self.listen_http_port,
161 0 : listen_https_port: self.listen_https_port,
162 0 : listen_pg_addr: self.listen_pg_addr.clone(),
163 0 : listen_pg_port: self.listen_pg_port,
164 0 : listen_grpc_addr: self.listen_grpc_addr.clone(),
165 0 : listen_grpc_port: self.listen_grpc_port,
166 0 : }
167 0 : }
168 :
169 0 : pub(crate) fn get_availability(&self) -> &NodeAvailability {
170 0 : &self.availability
171 0 : }
172 :
173 282 : pub(crate) fn set_availability(&mut self, availability: NodeAvailability) {
174 : use AvailabilityTransition::*;
175 : use NodeAvailability::WarmingUp;
176 :
177 282 : match self.get_availability_transition(&availability) {
178 278 : ToActive => {
179 278 : // Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
180 278 : // users of previously-cloned copies of the node will still see the old cancellation
181 278 : // state. For example, Reconcilers in flight will have to complete and be spawned
182 278 : // again to realize that the node has become available.
183 278 : self.cancel = CancellationToken::new();
184 278 : }
185 2 : ToOffline | ToWarmingUpFromActive => {
186 2 : // Fire the node's cancellation token to cancel any in-flight API requests to it
187 2 : self.cancel.cancel();
188 2 : }
189 2 : Unchanged | ToWarmingUpFromOffline => {}
190 : }
191 :
192 282 : if let (WarmingUp(crnt), WarmingUp(proposed)) = (&self.availability, &availability) {
193 0 : self.availability = WarmingUp(std::cmp::max(*crnt, *proposed));
194 282 : } else {
195 282 : self.availability = availability;
196 282 : }
197 282 : }
198 :
199 : /// Without modifying the availability of the node, convert the intended availability
200 : /// into a description of the transition.
201 282 : pub(crate) fn get_availability_transition(
202 282 : &self,
203 282 : availability: &NodeAvailability,
204 282 : ) -> AvailabilityTransition {
205 : use AvailabilityTransition::*;
206 : use NodeAvailability::*;
207 :
208 282 : match (&self.availability, availability) {
209 278 : (Offline, Active(_)) => ToActive,
210 2 : (Active(_), Offline) => ToOffline,
211 0 : (Active(_), WarmingUp(_)) => ToWarmingUpFromActive,
212 0 : (WarmingUp(_), Offline) => ToOffline,
213 0 : (WarmingUp(_), Active(_)) => ToActive,
214 0 : (Offline, WarmingUp(_)) => ToWarmingUpFromOffline,
215 2 : _ => Unchanged,
216 : }
217 282 : }
218 :
219 : /// Whether we may send API requests to this node.
220 278 : pub(crate) fn is_available(&self) -> bool {
221 : // When we clone a node, [`Self::availability`] is a snapshot, but [`Self::cancel`] holds
222 : // a reference to the original Node's cancellation status. Checking both of these results
223 : // in a "pessimistic" check where we will consider a Node instance unavailable if it was unavailable
224 : // when we cloned it, or if the original Node instance's cancellation token was fired.
225 278 : matches!(self.availability, NodeAvailability::Active(_)) && !self.cancel.is_cancelled()
226 278 : }
227 :
228 : /// Is this node elegible to have work scheduled onto it?
229 285 : pub(crate) fn may_schedule(&self) -> MaySchedule {
230 285 : let utilization = match &self.availability {
231 283 : NodeAvailability::Active(u) => u.clone(),
232 2 : NodeAvailability::Offline | NodeAvailability::WarmingUp(_) => return MaySchedule::No,
233 : };
234 :
235 283 : match self.scheduling {
236 283 : NodeSchedulingPolicy::Active => MaySchedule::Yes(utilization),
237 0 : NodeSchedulingPolicy::Deleting => MaySchedule::No,
238 0 : NodeSchedulingPolicy::Draining => MaySchedule::No,
239 0 : NodeSchedulingPolicy::Filling => MaySchedule::Yes(utilization),
240 0 : NodeSchedulingPolicy::Pause => MaySchedule::No,
241 0 : NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
242 : }
243 285 : }
244 :
245 : #[allow(clippy::too_many_arguments)]
246 278 : pub(crate) fn new(
247 278 : id: NodeId,
248 278 : listen_http_addr: String,
249 278 : listen_http_port: u16,
250 278 : listen_https_port: Option<u16>,
251 278 : listen_pg_addr: String,
252 278 : listen_pg_port: u16,
253 278 : listen_grpc_addr: Option<String>,
254 278 : listen_grpc_port: Option<u16>,
255 278 : availability_zone_id: AvailabilityZone,
256 278 : use_https: bool,
257 278 : ) -> anyhow::Result<Self> {
258 278 : if use_https && listen_https_port.is_none() {
259 0 : anyhow::bail!(
260 0 : "cannot create node {id}: \
261 0 : https is enabled, but https port is not specified"
262 : );
263 278 : }
264 :
265 278 : if listen_grpc_addr.is_some() != listen_grpc_port.is_some() {
266 0 : anyhow::bail!("cannot create node {id}: must specify both gRPC address and port");
267 278 : }
268 :
269 278 : Ok(Self {
270 278 : id,
271 278 : listen_http_addr,
272 278 : listen_http_port,
273 278 : listen_https_port,
274 278 : listen_pg_addr,
275 278 : listen_pg_port,
276 278 : listen_grpc_addr,
277 278 : listen_grpc_port,
278 278 : scheduling: NodeSchedulingPolicy::Active,
279 278 : lifecycle: NodeLifecycle::Active,
280 278 : availability: NodeAvailability::Offline,
281 278 : availability_zone_id,
282 278 : use_https,
283 278 : cancel: CancellationToken::new(),
284 278 : })
285 278 : }
286 :
287 0 : pub(crate) fn to_persistent(&self) -> NodePersistence {
288 : NodePersistence {
289 0 : node_id: self.id.0 as i64,
290 0 : scheduling_policy: self.scheduling.into(),
291 0 : lifecycle: self.lifecycle.into(),
292 0 : listen_http_addr: self.listen_http_addr.clone(),
293 0 : listen_http_port: self.listen_http_port as i32,
294 0 : listen_https_port: self.listen_https_port.map(|x| x as i32),
295 0 : listen_pg_addr: self.listen_pg_addr.clone(),
296 0 : listen_pg_port: self.listen_pg_port as i32,
297 0 : listen_grpc_addr: self.listen_grpc_addr.clone(),
298 0 : listen_grpc_port: self.listen_grpc_port.map(|port| port as i32),
299 0 : availability_zone_id: self.availability_zone_id.0.clone(),
300 : }
301 0 : }
302 :
303 0 : pub(crate) fn from_persistent(np: NodePersistence, use_https: bool) -> anyhow::Result<Self> {
304 0 : if use_https && np.listen_https_port.is_none() {
305 0 : anyhow::bail!(
306 0 : "cannot load node {} from persistent: \
307 0 : https is enabled, but https port is not specified",
308 : np.node_id,
309 : );
310 0 : }
311 :
312 0 : if np.listen_grpc_addr.is_some() != np.listen_grpc_port.is_some() {
313 0 : anyhow::bail!(
314 0 : "can't load node {}: must specify both gRPC address and port",
315 : np.node_id
316 : );
317 0 : }
318 :
319 : Ok(Self {
320 0 : id: NodeId(np.node_id as u64),
321 : // At startup we consider a node offline until proven otherwise.
322 0 : availability: NodeAvailability::Offline,
323 0 : scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
324 0 : .expect("Bad scheduling policy in DB"),
325 0 : lifecycle: NodeLifecycle::from_str(&np.lifecycle).expect("Bad lifecycle in DB"),
326 0 : listen_http_addr: np.listen_http_addr,
327 0 : listen_http_port: np.listen_http_port as u16,
328 0 : listen_https_port: np.listen_https_port.map(|x| x as u16),
329 0 : listen_pg_addr: np.listen_pg_addr,
330 0 : listen_pg_port: np.listen_pg_port as u16,
331 0 : listen_grpc_addr: np.listen_grpc_addr,
332 0 : listen_grpc_port: np.listen_grpc_port.map(|port| port as u16),
333 0 : availability_zone_id: AvailabilityZone(np.availability_zone_id),
334 0 : use_https,
335 0 : cancel: CancellationToken::new(),
336 : })
337 0 : }
338 :
339 : /// Wrapper for issuing requests to pageserver management API: takes care of generic
340 : /// retry/backoff for retryable HTTP status codes.
341 : ///
342 : /// This will return None to indicate cancellation. Cancellation may happen from
343 : /// the cancellation token passed in, or from Self's cancellation token (i.e. node
344 : /// going offline).
345 : #[allow(clippy::too_many_arguments)]
346 0 : pub(crate) async fn with_client_retries<T, O, F>(
347 0 : &self,
348 0 : mut op: O,
349 0 : http_client: &reqwest::Client,
350 0 : jwt: &Option<String>,
351 0 : warn_threshold: u32,
352 0 : max_retries: u32,
353 0 : timeout: Duration,
354 0 : cancel: &CancellationToken,
355 0 : ) -> Option<mgmt_api::Result<T>>
356 0 : where
357 0 : O: FnMut(PageserverClient) -> F,
358 0 : F: std::future::Future<Output = mgmt_api::Result<T>>,
359 0 : {
360 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
361 : use mgmt_api::Error::*;
362 0 : match e {
363 0 : SendRequest(_) | ReceiveBody(_) | ReceiveErrorBody(_) => false,
364 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
365 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
366 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
367 0 : ApiError(_, _) => true,
368 0 : Cancelled => true,
369 0 : Timeout(_) => false,
370 : }
371 0 : }
372 :
373 0 : backoff::retry(
374 0 : || {
375 0 : let client = PageserverClient::new(
376 0 : self.get_id(),
377 0 : http_client.clone(),
378 0 : self.base_url(),
379 0 : jwt.as_deref(),
380 : );
381 :
382 0 : let node_cancel_fut = self.cancel.cancelled();
383 :
384 0 : let op_fut = tokio::time::timeout(timeout, op(client));
385 :
386 0 : async {
387 0 : tokio::select! {
388 0 : r = op_fut => match r {
389 0 : Ok(r) => r,
390 0 : Err(e) => Err(mgmt_api::Error::Timeout(format!("{e}"))),
391 : },
392 0 : _ = node_cancel_fut => {
393 0 : Err(mgmt_api::Error::Cancelled)
394 : }}
395 0 : }
396 0 : },
397 : is_fatal,
398 0 : warn_threshold,
399 0 : max_retries,
400 0 : &format!(
401 0 : "Call to node {} ({}) management API",
402 0 : self.id,
403 0 : self.base_url(),
404 0 : ),
405 0 : cancel,
406 : )
407 0 : .await
408 0 : }
409 :
410 : /// Generate the simplified API-friendly description of a node's state
411 0 : pub(crate) fn describe(&self) -> NodeDescribeResponse {
412 0 : NodeDescribeResponse {
413 0 : id: self.id,
414 0 : availability: self.availability.clone().into(),
415 0 : scheduling: self.scheduling,
416 0 : availability_zone_id: self.availability_zone_id.0.clone(),
417 0 : listen_http_addr: self.listen_http_addr.clone(),
418 0 : listen_http_port: self.listen_http_port,
419 0 : listen_https_port: self.listen_https_port,
420 0 : listen_pg_addr: self.listen_pg_addr.clone(),
421 0 : listen_pg_port: self.listen_pg_port,
422 0 : listen_grpc_addr: self.listen_grpc_addr.clone(),
423 0 : listen_grpc_port: self.listen_grpc_port,
424 0 : }
425 0 : }
426 : }
427 :
428 : impl std::fmt::Display for Node {
429 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
430 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
431 0 : }
432 : }
433 :
434 : impl std::fmt::Debug for Node {
435 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
437 0 : }
438 : }
439 :
440 : #[cfg(test)]
441 : mod tests {
442 : use utils::id::NodeId;
443 :
444 : use crate::node::get_node_id_from_pod_name;
445 :
446 : #[test]
447 1 : fn test_get_node_id_from_pod_name() {
448 1 : let pod_name = "page-server-3-12";
449 1 : let node_id = get_node_id_from_pod_name(pod_name).unwrap();
450 1 : assert_eq!(node_id, NodeId(3000012));
451 :
452 1 : let pod_name = "safe-keeper-1-0";
453 1 : let node_id = get_node_id_from_pod_name(pod_name).unwrap();
454 1 : assert_eq!(node_id, NodeId(1000000));
455 :
456 1 : let pod_name = "invalid-pod-name";
457 1 : let result = get_node_id_from_pod_name(pod_name);
458 1 : assert!(result.is_err());
459 1 : }
460 : }
|