Line data Source code
1 : use std::str::FromStr;
2 : use std::time::Duration;
3 :
4 : use pageserver_api::controller_api::{
5 : AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest,
6 : NodeSchedulingPolicy, TenantLocateResponseShard,
7 : };
8 : use pageserver_api::shard::TenantShardId;
9 : use pageserver_client::mgmt_api;
10 : use reqwest::{Certificate, StatusCode};
11 : use serde::Serialize;
12 : use tokio_util::sync::CancellationToken;
13 : use utils::backoff;
14 : use utils::id::NodeId;
15 :
16 : use crate::pageserver_client::PageserverClient;
17 : use crate::persistence::NodePersistence;
18 : use crate::scheduler::MaySchedule;
19 :
20 : /// Represents the in-memory description of a Node.
21 : ///
22 : /// Scheduling statistics are maintened separately in [`crate::scheduler`].
23 : ///
24 : /// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
25 : /// implementation of serialization on this type is only for debug dumps.
26 : #[derive(Clone, Serialize)]
27 : pub(crate) struct Node {
28 : id: NodeId,
29 :
30 : availability: NodeAvailability,
31 : scheduling: NodeSchedulingPolicy,
32 :
33 : listen_http_addr: String,
34 : listen_http_port: u16,
35 : listen_https_port: Option<u16>,
36 :
37 : listen_pg_addr: String,
38 : listen_pg_port: u16,
39 :
40 : availability_zone_id: AvailabilityZone,
41 :
42 : // Flag from storcon's config to use https for pageserver admin API.
43 : // Invariant: if |true|, listen_https_port should contain a value.
44 : use_https: bool,
45 : // This cancellation token means "stop any RPCs in flight to this node, and don't start
46 : // any more". It is not related to process shutdown.
47 : #[serde(skip)]
48 : cancel: CancellationToken,
49 : }
50 :
51 : /// When updating [`Node::availability`] we use this type to indicate to the caller
52 : /// whether/how they changed it.
53 : pub(crate) enum AvailabilityTransition {
54 : ToActive,
55 : ToWarmingUpFromActive,
56 : ToWarmingUpFromOffline,
57 : ToOffline,
58 : Unchanged,
59 : }
60 :
61 : impl Node {
62 0 : pub(crate) fn base_url(&self) -> String {
63 0 : if self.use_https {
64 0 : format!(
65 0 : "https://{}:{}",
66 0 : self.listen_http_addr,
67 0 : self.listen_https_port
68 0 : .expect("https port should be specified if use_https is on")
69 0 : )
70 : } else {
71 0 : format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
72 : }
73 0 : }
74 :
75 285 : pub(crate) fn get_id(&self) -> NodeId {
76 285 : self.id
77 285 : }
78 :
79 : #[allow(unused)]
80 25481 : pub(crate) fn get_availability_zone_id(&self) -> &AvailabilityZone {
81 25481 : &self.availability_zone_id
82 25481 : }
83 :
84 0 : pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
85 0 : self.scheduling
86 0 : }
87 :
88 0 : pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
89 0 : self.scheduling = scheduling
90 0 : }
91 :
92 : /// Does this registration request match `self`? This is used when deciding whether a registration
93 : /// request should be allowed to update an existing record with the same node ID.
94 0 : pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
95 0 : self.id == register_req.node_id
96 0 : && self.listen_http_addr == register_req.listen_http_addr
97 0 : && self.listen_http_port == register_req.listen_http_port
98 : // Note: listen_https_port may change. See [`Self::need_update`] for mode details.
99 : // && self.listen_https_port == register_req.listen_https_port
100 0 : && self.listen_pg_addr == register_req.listen_pg_addr
101 0 : && self.listen_pg_port == register_req.listen_pg_port
102 0 : && self.availability_zone_id == register_req.availability_zone_id
103 0 : }
104 :
105 : // Do we need to update an existing record in DB on this registration request?
106 0 : pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool {
107 0 : // listen_https_port is checked here because it may change during migration to https.
108 0 : // After migration, this check may be moved to registration_match.
109 0 : self.listen_https_port != register_req.listen_https_port
110 0 : }
111 :
112 : /// For a shard located on this node, populate a response object
113 : /// with this node's address information.
114 0 : pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard {
115 0 : TenantLocateResponseShard {
116 0 : shard_id,
117 0 : node_id: self.id,
118 0 : listen_http_addr: self.listen_http_addr.clone(),
119 0 : listen_http_port: self.listen_http_port,
120 0 : listen_https_port: self.listen_https_port,
121 0 : listen_pg_addr: self.listen_pg_addr.clone(),
122 0 : listen_pg_port: self.listen_pg_port,
123 0 : }
124 0 : }
125 :
126 0 : pub(crate) fn get_availability(&self) -> &NodeAvailability {
127 0 : &self.availability
128 0 : }
129 :
130 282 : pub(crate) fn set_availability(&mut self, availability: NodeAvailability) {
131 : use AvailabilityTransition::*;
132 : use NodeAvailability::WarmingUp;
133 :
134 282 : match self.get_availability_transition(&availability) {
135 278 : ToActive => {
136 278 : // Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
137 278 : // users of previously-cloned copies of the node will still see the old cancellation
138 278 : // state. For example, Reconcilers in flight will have to complete and be spawned
139 278 : // again to realize that the node has become available.
140 278 : self.cancel = CancellationToken::new();
141 278 : }
142 2 : ToOffline | ToWarmingUpFromActive => {
143 2 : // Fire the node's cancellation token to cancel any in-flight API requests to it
144 2 : self.cancel.cancel();
145 2 : }
146 2 : Unchanged | ToWarmingUpFromOffline => {}
147 : }
148 :
149 282 : if let (WarmingUp(crnt), WarmingUp(proposed)) = (&self.availability, &availability) {
150 0 : self.availability = WarmingUp(std::cmp::max(*crnt, *proposed));
151 282 : } else {
152 282 : self.availability = availability;
153 282 : }
154 282 : }
155 :
156 : /// Without modifying the availability of the node, convert the intended availability
157 : /// into a description of the transition.
158 282 : pub(crate) fn get_availability_transition(
159 282 : &self,
160 282 : availability: &NodeAvailability,
161 282 : ) -> AvailabilityTransition {
162 : use AvailabilityTransition::*;
163 : use NodeAvailability::*;
164 :
165 282 : match (&self.availability, availability) {
166 278 : (Offline, Active(_)) => ToActive,
167 2 : (Active(_), Offline) => ToOffline,
168 0 : (Active(_), WarmingUp(_)) => ToWarmingUpFromActive,
169 0 : (WarmingUp(_), Offline) => ToOffline,
170 0 : (WarmingUp(_), Active(_)) => ToActive,
171 0 : (Offline, WarmingUp(_)) => ToWarmingUpFromOffline,
172 2 : _ => Unchanged,
173 : }
174 282 : }
175 :
176 : /// Whether we may send API requests to this node.
177 278 : pub(crate) fn is_available(&self) -> bool {
178 : // When we clone a node, [`Self::availability`] is a snapshot, but [`Self::cancel`] holds
179 : // a reference to the original Node's cancellation status. Checking both of these results
180 : // in a "pessimistic" check where we will consider a Node instance unavailable if it was unavailable
181 : // when we cloned it, or if the original Node instance's cancellation token was fired.
182 278 : matches!(self.availability, NodeAvailability::Active(_)) && !self.cancel.is_cancelled()
183 278 : }
184 :
185 : /// Is this node elegible to have work scheduled onto it?
186 285 : pub(crate) fn may_schedule(&self) -> MaySchedule {
187 285 : let utilization = match &self.availability {
188 283 : NodeAvailability::Active(u) => u.clone(),
189 2 : NodeAvailability::Offline | NodeAvailability::WarmingUp(_) => return MaySchedule::No,
190 : };
191 :
192 283 : match self.scheduling {
193 283 : NodeSchedulingPolicy::Active => MaySchedule::Yes(utilization),
194 0 : NodeSchedulingPolicy::Draining => MaySchedule::No,
195 0 : NodeSchedulingPolicy::Filling => MaySchedule::Yes(utilization),
196 0 : NodeSchedulingPolicy::Pause => MaySchedule::No,
197 0 : NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
198 : }
199 285 : }
200 :
201 : #[allow(clippy::too_many_arguments)]
202 278 : pub(crate) fn new(
203 278 : id: NodeId,
204 278 : listen_http_addr: String,
205 278 : listen_http_port: u16,
206 278 : listen_https_port: Option<u16>,
207 278 : listen_pg_addr: String,
208 278 : listen_pg_port: u16,
209 278 : availability_zone_id: AvailabilityZone,
210 278 : use_https: bool,
211 278 : ) -> anyhow::Result<Self> {
212 278 : if use_https && listen_https_port.is_none() {
213 0 : anyhow::bail!(
214 0 : "cannot create node {id}: \
215 0 : https is enabled, but https port is not specified"
216 0 : );
217 278 : }
218 278 :
219 278 : Ok(Self {
220 278 : id,
221 278 : listen_http_addr,
222 278 : listen_http_port,
223 278 : listen_https_port,
224 278 : listen_pg_addr,
225 278 : listen_pg_port,
226 278 : scheduling: NodeSchedulingPolicy::Active,
227 278 : availability: NodeAvailability::Offline,
228 278 : availability_zone_id,
229 278 : use_https,
230 278 : cancel: CancellationToken::new(),
231 278 : })
232 278 : }
233 :
234 0 : pub(crate) fn to_persistent(&self) -> NodePersistence {
235 0 : NodePersistence {
236 0 : node_id: self.id.0 as i64,
237 0 : scheduling_policy: self.scheduling.into(),
238 0 : listen_http_addr: self.listen_http_addr.clone(),
239 0 : listen_http_port: self.listen_http_port as i32,
240 0 : listen_https_port: self.listen_https_port.map(|x| x as i32),
241 0 : listen_pg_addr: self.listen_pg_addr.clone(),
242 0 : listen_pg_port: self.listen_pg_port as i32,
243 0 : availability_zone_id: self.availability_zone_id.0.clone(),
244 0 : }
245 0 : }
246 :
247 0 : pub(crate) fn from_persistent(np: NodePersistence, use_https: bool) -> anyhow::Result<Self> {
248 0 : if use_https && np.listen_https_port.is_none() {
249 0 : anyhow::bail!(
250 0 : "cannot load node {} from persistent: \
251 0 : https is enabled, but https port is not specified",
252 0 : np.node_id,
253 0 : );
254 0 : }
255 0 :
256 0 : Ok(Self {
257 0 : id: NodeId(np.node_id as u64),
258 0 : // At startup we consider a node offline until proven otherwise.
259 0 : availability: NodeAvailability::Offline,
260 0 : scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
261 0 : .expect("Bad scheduling policy in DB"),
262 0 : listen_http_addr: np.listen_http_addr,
263 0 : listen_http_port: np.listen_http_port as u16,
264 0 : listen_https_port: np.listen_https_port.map(|x| x as u16),
265 0 : listen_pg_addr: np.listen_pg_addr,
266 0 : listen_pg_port: np.listen_pg_port as u16,
267 0 : availability_zone_id: AvailabilityZone(np.availability_zone_id),
268 0 : use_https,
269 0 : cancel: CancellationToken::new(),
270 0 : })
271 0 : }
272 :
273 : /// Wrapper for issuing requests to pageserver management API: takes care of generic
274 : /// retry/backoff for retryable HTTP status codes.
275 : ///
276 : /// This will return None to indicate cancellation. Cancellation may happen from
277 : /// the cancellation token passed in, or from Self's cancellation token (i.e. node
278 : /// going offline).
279 : #[allow(clippy::too_many_arguments)]
280 0 : pub(crate) async fn with_client_retries<T, O, F>(
281 0 : &self,
282 0 : mut op: O,
283 0 : jwt: &Option<String>,
284 0 : ssl_ca_cert: &Option<Certificate>,
285 0 : warn_threshold: u32,
286 0 : max_retries: u32,
287 0 : timeout: Duration,
288 0 : cancel: &CancellationToken,
289 0 : ) -> Option<mgmt_api::Result<T>>
290 0 : where
291 0 : O: FnMut(PageserverClient) -> F,
292 0 : F: std::future::Future<Output = mgmt_api::Result<T>>,
293 0 : {
294 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
295 : use mgmt_api::Error::*;
296 0 : match e {
297 0 : SendRequest(_) | ReceiveBody(_) | ReceiveErrorBody(_) => false,
298 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
299 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
300 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
301 0 : ApiError(_, _) => true,
302 0 : Cancelled => true,
303 0 : CreateClient(_) => true,
304 : }
305 0 : }
306 :
307 : // TODO: refactor PageserverClient and with_client_retires (#11113).
308 0 : let mut http_client = reqwest::ClientBuilder::new().timeout(timeout);
309 0 : if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() {
310 0 : http_client = http_client.add_root_certificate(ssl_ca_cert.clone())
311 0 : }
312 :
313 0 : let http_client = match http_client.build() {
314 0 : Ok(http_client) => http_client,
315 0 : Err(err) => return Some(Err(mgmt_api::Error::CreateClient(err))),
316 : };
317 :
318 0 : backoff::retry(
319 0 : || {
320 0 : let client = PageserverClient::from_client(
321 0 : self.get_id(),
322 0 : http_client.clone(),
323 0 : self.base_url(),
324 0 : jwt.as_deref(),
325 0 : );
326 0 :
327 0 : let node_cancel_fut = self.cancel.cancelled();
328 0 :
329 0 : let op_fut = op(client);
330 :
331 0 : async {
332 0 : tokio::select! {
333 0 : r = op_fut=> {r},
334 0 : _ = node_cancel_fut => {
335 0 : Err(mgmt_api::Error::Cancelled)
336 : }}
337 0 : }
338 0 : },
339 0 : is_fatal,
340 0 : warn_threshold,
341 0 : max_retries,
342 0 : &format!(
343 0 : "Call to node {} ({}) management API",
344 0 : self.id,
345 0 : self.base_url(),
346 0 : ),
347 0 : cancel,
348 0 : )
349 0 : .await
350 0 : }
351 :
352 : /// Generate the simplified API-friendly description of a node's state
353 0 : pub(crate) fn describe(&self) -> NodeDescribeResponse {
354 0 : NodeDescribeResponse {
355 0 : id: self.id,
356 0 : availability: self.availability.clone().into(),
357 0 : scheduling: self.scheduling,
358 0 : availability_zone_id: self.availability_zone_id.0.clone(),
359 0 : listen_http_addr: self.listen_http_addr.clone(),
360 0 : listen_http_port: self.listen_http_port,
361 0 : listen_https_port: self.listen_https_port,
362 0 : listen_pg_addr: self.listen_pg_addr.clone(),
363 0 : listen_pg_port: self.listen_pg_port,
364 0 : }
365 0 : }
366 : }
367 :
368 : impl std::fmt::Display for Node {
369 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
370 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
371 0 : }
372 : }
373 :
374 : impl std::fmt::Debug for Node {
375 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
377 0 : }
378 : }
|