Line data Source code
1 : use std::str::FromStr;
2 : use std::time::Duration;
3 :
4 : use anyhow::anyhow;
5 : use pageserver_api::controller_api::{
6 : AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest,
7 : NodeSchedulingPolicy, TenantLocateResponseShard,
8 : };
9 : use pageserver_api::shard::TenantShardId;
10 : use pageserver_client::mgmt_api;
11 : use reqwest::StatusCode;
12 : use serde::Serialize;
13 : use tokio_util::sync::CancellationToken;
14 : use utils::backoff;
15 : use utils::id::NodeId;
16 :
17 : use crate::pageserver_client::PageserverClient;
18 : use crate::persistence::NodePersistence;
19 : use crate::scheduler::MaySchedule;
20 :
21 : /// Represents the in-memory description of a Node.
22 : ///
23 : /// Scheduling statistics are maintened separately in [`crate::scheduler`].
24 : ///
25 : /// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
26 : /// implementation of serialization on this type is only for debug dumps.
27 : #[derive(Clone, Serialize)]
28 : pub(crate) struct Node {
29 : id: NodeId,
30 :
31 : availability: NodeAvailability,
32 : scheduling: NodeSchedulingPolicy,
33 :
34 : listen_http_addr: String,
35 : listen_http_port: u16,
36 : listen_https_port: Option<u16>,
37 :
38 : listen_pg_addr: String,
39 : listen_pg_port: u16,
40 :
41 : availability_zone_id: AvailabilityZone,
42 :
43 : // Flag from storcon's config to use https for pageserver admin API.
44 : // Invariant: if |true|, listen_https_port should contain a value.
45 : use_https: bool,
46 : // This cancellation token means "stop any RPCs in flight to this node, and don't start
47 : // any more". It is not related to process shutdown.
48 : #[serde(skip)]
49 : cancel: CancellationToken,
50 : }
51 :
52 : /// When updating [`Node::availability`] we use this type to indicate to the caller
53 : /// whether/how they changed it.
54 : pub(crate) enum AvailabilityTransition {
55 : ToActive,
56 : ToWarmingUpFromActive,
57 : ToWarmingUpFromOffline,
58 : ToOffline,
59 : Unchanged,
60 : }
61 :
62 : impl Node {
63 0 : pub(crate) fn base_url(&self) -> String {
64 0 : if self.use_https {
65 0 : format!(
66 0 : "https://{}:{}",
67 0 : self.listen_http_addr,
68 0 : self.listen_https_port
69 0 : .expect("https port should be specified if use_https is on")
70 0 : )
71 : } else {
72 0 : format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
73 : }
74 0 : }
75 :
76 279 : pub(crate) fn get_id(&self) -> NodeId {
77 279 : self.id
78 279 : }
79 :
80 : #[allow(unused)]
81 25475 : pub(crate) fn get_availability_zone_id(&self) -> &AvailabilityZone {
82 25475 : &self.availability_zone_id
83 25475 : }
84 :
85 0 : pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
86 0 : self.scheduling
87 0 : }
88 :
89 0 : pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
90 0 : self.scheduling = scheduling
91 0 : }
92 :
93 : /// Does this registration request match `self`? This is used when deciding whether a registration
94 : /// request should be allowed to update an existing record with the same node ID.
95 0 : pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
96 0 : self.id == register_req.node_id
97 0 : && self.listen_http_addr == register_req.listen_http_addr
98 0 : && self.listen_http_port == register_req.listen_http_port
99 : // Note: listen_https_port may change. See [`Self::need_update`] for mode details.
100 : // && self.listen_https_port == register_req.listen_https_port
101 0 : && self.listen_pg_addr == register_req.listen_pg_addr
102 0 : && self.listen_pg_port == register_req.listen_pg_port
103 0 : && self.availability_zone_id == register_req.availability_zone_id
104 0 : }
105 :
106 : // Do we need to update an existing record in DB on this registration request?
107 0 : pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool {
108 0 : // listen_https_port is checked here because it may change during migration to https.
109 0 : // After migration, this check may be moved to registration_match.
110 0 : self.listen_https_port != register_req.listen_https_port
111 0 : }
112 :
113 : /// For a shard located on this node, populate a response object
114 : /// with this node's address information.
115 0 : pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard {
116 0 : TenantLocateResponseShard {
117 0 : shard_id,
118 0 : node_id: self.id,
119 0 : listen_http_addr: self.listen_http_addr.clone(),
120 0 : listen_http_port: self.listen_http_port,
121 0 : listen_https_port: self.listen_https_port,
122 0 : listen_pg_addr: self.listen_pg_addr.clone(),
123 0 : listen_pg_port: self.listen_pg_port,
124 0 : }
125 0 : }
126 :
127 0 : pub(crate) fn get_availability(&self) -> &NodeAvailability {
128 0 : &self.availability
129 0 : }
130 :
131 276 : pub(crate) fn set_availability(&mut self, availability: NodeAvailability) {
132 : use AvailabilityTransition::*;
133 : use NodeAvailability::WarmingUp;
134 :
135 276 : match self.get_availability_transition(&availability) {
136 272 : ToActive => {
137 272 : // Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
138 272 : // users of previously-cloned copies of the node will still see the old cancellation
139 272 : // state. For example, Reconcilers in flight will have to complete and be spawned
140 272 : // again to realize that the node has become available.
141 272 : self.cancel = CancellationToken::new();
142 272 : }
143 2 : ToOffline | ToWarmingUpFromActive => {
144 2 : // Fire the node's cancellation token to cancel any in-flight API requests to it
145 2 : self.cancel.cancel();
146 2 : }
147 2 : Unchanged | ToWarmingUpFromOffline => {}
148 : }
149 :
150 276 : if let (WarmingUp(crnt), WarmingUp(proposed)) = (&self.availability, &availability) {
151 0 : self.availability = WarmingUp(std::cmp::max(*crnt, *proposed));
152 276 : } else {
153 276 : self.availability = availability;
154 276 : }
155 276 : }
156 :
157 : /// Without modifying the availability of the node, convert the intended availability
158 : /// into a description of the transition.
159 276 : pub(crate) fn get_availability_transition(
160 276 : &self,
161 276 : availability: &NodeAvailability,
162 276 : ) -> AvailabilityTransition {
163 : use AvailabilityTransition::*;
164 : use NodeAvailability::*;
165 :
166 276 : match (&self.availability, availability) {
167 272 : (Offline, Active(_)) => ToActive,
168 2 : (Active(_), Offline) => ToOffline,
169 0 : (Active(_), WarmingUp(_)) => ToWarmingUpFromActive,
170 0 : (WarmingUp(_), Offline) => ToOffline,
171 0 : (WarmingUp(_), Active(_)) => ToActive,
172 0 : (Offline, WarmingUp(_)) => ToWarmingUpFromOffline,
173 2 : _ => Unchanged,
174 : }
175 276 : }
176 :
177 : /// Whether we may send API requests to this node.
178 272 : pub(crate) fn is_available(&self) -> bool {
179 : // When we clone a node, [`Self::availability`] is a snapshot, but [`Self::cancel`] holds
180 : // a reference to the original Node's cancellation status. Checking both of these results
181 : // in a "pessimistic" check where we will consider a Node instance unavailable if it was unavailable
182 : // when we cloned it, or if the original Node instance's cancellation token was fired.
183 272 : matches!(self.availability, NodeAvailability::Active(_)) && !self.cancel.is_cancelled()
184 272 : }
185 :
186 : /// Is this node elegible to have work scheduled onto it?
187 279 : pub(crate) fn may_schedule(&self) -> MaySchedule {
188 279 : let utilization = match &self.availability {
189 277 : NodeAvailability::Active(u) => u.clone(),
190 2 : NodeAvailability::Offline | NodeAvailability::WarmingUp(_) => return MaySchedule::No,
191 : };
192 :
193 277 : match self.scheduling {
194 277 : NodeSchedulingPolicy::Active => MaySchedule::Yes(utilization),
195 0 : NodeSchedulingPolicy::Draining => MaySchedule::No,
196 0 : NodeSchedulingPolicy::Filling => MaySchedule::Yes(utilization),
197 0 : NodeSchedulingPolicy::Pause => MaySchedule::No,
198 0 : NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
199 : }
200 279 : }
201 :
202 : #[allow(clippy::too_many_arguments)]
203 272 : pub(crate) fn new(
204 272 : id: NodeId,
205 272 : listen_http_addr: String,
206 272 : listen_http_port: u16,
207 272 : listen_https_port: Option<u16>,
208 272 : listen_pg_addr: String,
209 272 : listen_pg_port: u16,
210 272 : availability_zone_id: AvailabilityZone,
211 272 : use_https: bool,
212 272 : ) -> anyhow::Result<Self> {
213 272 : if use_https && listen_https_port.is_none() {
214 0 : return Err(anyhow!("https is enabled, but node has no https port"));
215 272 : }
216 272 :
217 272 : Ok(Self {
218 272 : id,
219 272 : listen_http_addr,
220 272 : listen_http_port,
221 272 : listen_https_port,
222 272 : listen_pg_addr,
223 272 : listen_pg_port,
224 272 : scheduling: NodeSchedulingPolicy::Active,
225 272 : availability: NodeAvailability::Offline,
226 272 : availability_zone_id,
227 272 : use_https,
228 272 : cancel: CancellationToken::new(),
229 272 : })
230 272 : }
231 :
232 0 : pub(crate) fn to_persistent(&self) -> NodePersistence {
233 0 : NodePersistence {
234 0 : node_id: self.id.0 as i64,
235 0 : scheduling_policy: self.scheduling.into(),
236 0 : listen_http_addr: self.listen_http_addr.clone(),
237 0 : listen_http_port: self.listen_http_port as i32,
238 0 : listen_https_port: self.listen_https_port.map(|x| x as i32),
239 0 : listen_pg_addr: self.listen_pg_addr.clone(),
240 0 : listen_pg_port: self.listen_pg_port as i32,
241 0 : availability_zone_id: self.availability_zone_id.0.clone(),
242 0 : }
243 0 : }
244 :
245 0 : pub(crate) fn from_persistent(np: NodePersistence, use_https: bool) -> anyhow::Result<Self> {
246 0 : if use_https && np.listen_https_port.is_none() {
247 0 : return Err(anyhow!("https is enabled, but node has no https port"));
248 0 : }
249 0 :
250 0 : Ok(Self {
251 0 : id: NodeId(np.node_id as u64),
252 0 : // At startup we consider a node offline until proven otherwise.
253 0 : availability: NodeAvailability::Offline,
254 0 : scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
255 0 : .expect("Bad scheduling policy in DB"),
256 0 : listen_http_addr: np.listen_http_addr,
257 0 : listen_http_port: np.listen_http_port as u16,
258 0 : listen_https_port: np.listen_https_port.map(|x| x as u16),
259 0 : listen_pg_addr: np.listen_pg_addr,
260 0 : listen_pg_port: np.listen_pg_port as u16,
261 0 : availability_zone_id: AvailabilityZone(np.availability_zone_id),
262 0 : use_https,
263 0 : cancel: CancellationToken::new(),
264 0 : })
265 0 : }
266 :
267 : /// Wrapper for issuing requests to pageserver management API: takes care of generic
268 : /// retry/backoff for retryable HTTP status codes.
269 : ///
270 : /// This will return None to indicate cancellation. Cancellation may happen from
271 : /// the cancellation token passed in, or from Self's cancellation token (i.e. node
272 : /// going offline).
273 0 : pub(crate) async fn with_client_retries<T, O, F>(
274 0 : &self,
275 0 : mut op: O,
276 0 : jwt: &Option<String>,
277 0 : warn_threshold: u32,
278 0 : max_retries: u32,
279 0 : timeout: Duration,
280 0 : cancel: &CancellationToken,
281 0 : ) -> Option<mgmt_api::Result<T>>
282 0 : where
283 0 : O: FnMut(PageserverClient) -> F,
284 0 : F: std::future::Future<Output = mgmt_api::Result<T>>,
285 0 : {
286 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
287 : use mgmt_api::Error::*;
288 0 : match e {
289 0 : SendRequest(_) | ReceiveBody(_) | ReceiveErrorBody(_) => false,
290 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
291 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
292 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
293 0 : ApiError(_, _) => true,
294 0 : Cancelled => true,
295 : }
296 0 : }
297 :
298 0 : backoff::retry(
299 0 : || {
300 0 : let http_client = reqwest::ClientBuilder::new()
301 0 : .timeout(timeout)
302 0 : .build()
303 0 : .expect("Failed to construct HTTP client");
304 0 :
305 0 : let client = PageserverClient::from_client(
306 0 : self.get_id(),
307 0 : http_client,
308 0 : self.base_url(),
309 0 : jwt.as_deref(),
310 0 : );
311 0 :
312 0 : let node_cancel_fut = self.cancel.cancelled();
313 0 :
314 0 : let op_fut = op(client);
315 :
316 0 : async {
317 0 : tokio::select! {
318 0 : r = op_fut=> {r},
319 0 : _ = node_cancel_fut => {
320 0 : Err(mgmt_api::Error::Cancelled)
321 : }}
322 0 : }
323 0 : },
324 0 : is_fatal,
325 0 : warn_threshold,
326 0 : max_retries,
327 0 : &format!(
328 0 : "Call to node {} ({}) management API",
329 0 : self.id,
330 0 : self.base_url(),
331 0 : ),
332 0 : cancel,
333 0 : )
334 0 : .await
335 0 : }
336 :
337 : /// Generate the simplified API-friendly description of a node's state
338 0 : pub(crate) fn describe(&self) -> NodeDescribeResponse {
339 0 : NodeDescribeResponse {
340 0 : id: self.id,
341 0 : availability: self.availability.clone().into(),
342 0 : scheduling: self.scheduling,
343 0 : availability_zone_id: self.availability_zone_id.0.clone(),
344 0 : listen_http_addr: self.listen_http_addr.clone(),
345 0 : listen_http_port: self.listen_http_port,
346 0 : listen_https_port: self.listen_https_port,
347 0 : listen_pg_addr: self.listen_pg_addr.clone(),
348 0 : listen_pg_port: self.listen_pg_port,
349 0 : }
350 0 : }
351 : }
352 :
353 : impl std::fmt::Display for Node {
354 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
356 0 : }
357 : }
358 :
359 : impl std::fmt::Debug for Node {
360 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
362 0 : }
363 : }
|