Line data Source code
1 : use std::{str::FromStr, time::Duration};
2 :
3 : use pageserver_api::{
4 : controller_api::{
5 : AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest,
6 : NodeSchedulingPolicy, TenantLocateResponseShard,
7 : },
8 : shard::TenantShardId,
9 : };
10 : use pageserver_client::mgmt_api;
11 : use reqwest::StatusCode;
12 : use serde::Serialize;
13 : use tokio_util::sync::CancellationToken;
14 : use utils::{backoff, id::NodeId};
15 :
16 : use crate::{
17 : pageserver_client::PageserverClient, persistence::NodePersistence, scheduler::MaySchedule,
18 : };
19 :
20 : /// Represents the in-memory description of a Node.
21 : ///
22 : /// Scheduling statistics are maintened separately in [`crate::scheduler`].
23 : ///
24 : /// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
25 : /// implementation of serialization on this type is only for debug dumps.
26 : #[derive(Clone, Serialize)]
27 : pub(crate) struct Node {
28 : id: NodeId,
29 :
30 : availability: NodeAvailability,
31 : scheduling: NodeSchedulingPolicy,
32 :
33 : listen_http_addr: String,
34 : listen_http_port: u16,
35 :
36 : listen_pg_addr: String,
37 : listen_pg_port: u16,
38 :
39 : availability_zone_id: AvailabilityZone,
40 :
41 : // This cancellation token means "stop any RPCs in flight to this node, and don't start
42 : // any more". It is not related to process shutdown.
43 : #[serde(skip)]
44 : cancel: CancellationToken,
45 : }
46 :
47 : /// When updating [`Node::availability`] we use this type to indicate to the caller
48 : /// whether/how they changed it.
49 : pub(crate) enum AvailabilityTransition {
50 : ToActive,
51 : ToWarmingUpFromActive,
52 : ToWarmingUpFromOffline,
53 : ToOffline,
54 : Unchanged,
55 : }
56 :
57 : impl Node {
58 0 : pub(crate) fn base_url(&self) -> String {
59 0 : format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
60 0 : }
61 :
62 236 : pub(crate) fn get_id(&self) -> NodeId {
63 236 : self.id
64 236 : }
65 :
66 : #[allow(unused)]
67 25433 : pub(crate) fn get_availability_zone_id(&self) -> &AvailabilityZone {
68 25433 : &self.availability_zone_id
69 25433 : }
70 :
71 46 : pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
72 46 : self.scheduling
73 46 : }
74 :
75 0 : pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
76 0 : self.scheduling = scheduling
77 0 : }
78 :
79 : /// Does this registration request match `self`? This is used when deciding whether a registration
80 : /// request should be allowed to update an existing record with the same node ID.
81 0 : pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
82 0 : self.id == register_req.node_id
83 0 : && self.listen_http_addr == register_req.listen_http_addr
84 0 : && self.listen_http_port == register_req.listen_http_port
85 0 : && self.listen_pg_addr == register_req.listen_pg_addr
86 0 : && self.listen_pg_port == register_req.listen_pg_port
87 0 : && self.availability_zone_id == register_req.availability_zone_id
88 0 : }
89 :
90 : /// For a shard located on this node, populate a response object
91 : /// with this node's address information.
92 0 : pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard {
93 0 : TenantLocateResponseShard {
94 0 : shard_id,
95 0 : node_id: self.id,
96 0 : listen_http_addr: self.listen_http_addr.clone(),
97 0 : listen_http_port: self.listen_http_port,
98 0 : listen_pg_addr: self.listen_pg_addr.clone(),
99 0 : listen_pg_port: self.listen_pg_port,
100 0 : }
101 0 : }
102 :
103 0 : pub(crate) fn get_availability(&self) -> &NodeAvailability {
104 0 : &self.availability
105 0 : }
106 :
107 233 : pub(crate) fn set_availability(&mut self, availability: NodeAvailability) {
108 : use AvailabilityTransition::*;
109 : use NodeAvailability::WarmingUp;
110 :
111 233 : match self.get_availability_transition(&availability) {
112 230 : ToActive => {
113 230 : // Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
114 230 : // users of previously-cloned copies of the node will still see the old cancellation
115 230 : // state. For example, Reconcilers in flight will have to complete and be spawned
116 230 : // again to realize that the node has become available.
117 230 : self.cancel = CancellationToken::new();
118 230 : }
119 1 : ToOffline | ToWarmingUpFromActive => {
120 1 : // Fire the node's cancellation token to cancel any in-flight API requests to it
121 1 : self.cancel.cancel();
122 1 : }
123 2 : Unchanged | ToWarmingUpFromOffline => {}
124 : }
125 :
126 233 : if let (WarmingUp(crnt), WarmingUp(proposed)) = (&self.availability, &availability) {
127 0 : self.availability = WarmingUp(std::cmp::max(*crnt, *proposed));
128 233 : } else {
129 233 : self.availability = availability;
130 233 : }
131 233 : }
132 :
133 : /// Without modifying the availability of the node, convert the intended availability
134 : /// into a description of the transition.
135 233 : pub(crate) fn get_availability_transition(
136 233 : &self,
137 233 : availability: &NodeAvailability,
138 233 : ) -> AvailabilityTransition {
139 : use AvailabilityTransition::*;
140 : use NodeAvailability::*;
141 :
142 233 : match (&self.availability, availability) {
143 230 : (Offline, Active(_)) => ToActive,
144 1 : (Active(_), Offline) => ToOffline,
145 0 : (Active(_), WarmingUp(_)) => ToWarmingUpFromActive,
146 0 : (WarmingUp(_), Offline) => ToOffline,
147 0 : (WarmingUp(_), Active(_)) => ToActive,
148 0 : (Offline, WarmingUp(_)) => ToWarmingUpFromOffline,
149 2 : _ => Unchanged,
150 : }
151 233 : }
152 :
153 : /// Whether we may send API requests to this node.
154 230 : pub(crate) fn is_available(&self) -> bool {
155 : // When we clone a node, [`Self::availability`] is a snapshot, but [`Self::cancel`] holds
156 : // a reference to the original Node's cancellation status. Checking both of these results
157 : // in a "pessimistic" check where we will consider a Node instance unavailable if it was unavailable
158 : // when we cloned it, or if the original Node instance's cancellation token was fired.
159 230 : matches!(self.availability, NodeAvailability::Active(_)) && !self.cancel.is_cancelled()
160 230 : }
161 :
162 : /// Is this node elegible to have work scheduled onto it?
163 282 : pub(crate) fn may_schedule(&self) -> MaySchedule {
164 282 : let utilization = match &self.availability {
165 281 : NodeAvailability::Active(u) => u.clone(),
166 1 : NodeAvailability::Offline | NodeAvailability::WarmingUp(_) => return MaySchedule::No,
167 : };
168 :
169 281 : match self.scheduling {
170 281 : NodeSchedulingPolicy::Active => MaySchedule::Yes(utilization),
171 0 : NodeSchedulingPolicy::Draining => MaySchedule::No,
172 0 : NodeSchedulingPolicy::Filling => MaySchedule::Yes(utilization),
173 0 : NodeSchedulingPolicy::Pause => MaySchedule::No,
174 0 : NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
175 : }
176 282 : }
177 :
178 230 : pub(crate) fn new(
179 230 : id: NodeId,
180 230 : listen_http_addr: String,
181 230 : listen_http_port: u16,
182 230 : listen_pg_addr: String,
183 230 : listen_pg_port: u16,
184 230 : availability_zone_id: AvailabilityZone,
185 230 : ) -> Self {
186 230 : Self {
187 230 : id,
188 230 : listen_http_addr,
189 230 : listen_http_port,
190 230 : listen_pg_addr,
191 230 : listen_pg_port,
192 230 : scheduling: NodeSchedulingPolicy::Active,
193 230 : availability: NodeAvailability::Offline,
194 230 : availability_zone_id,
195 230 : cancel: CancellationToken::new(),
196 230 : }
197 230 : }
198 :
199 0 : pub(crate) fn to_persistent(&self) -> NodePersistence {
200 0 : NodePersistence {
201 0 : node_id: self.id.0 as i64,
202 0 : scheduling_policy: self.scheduling.into(),
203 0 : listen_http_addr: self.listen_http_addr.clone(),
204 0 : listen_http_port: self.listen_http_port as i32,
205 0 : listen_pg_addr: self.listen_pg_addr.clone(),
206 0 : listen_pg_port: self.listen_pg_port as i32,
207 0 : availability_zone_id: self.availability_zone_id.0.clone(),
208 0 : }
209 0 : }
210 :
211 0 : pub(crate) fn from_persistent(np: NodePersistence) -> Self {
212 0 : Self {
213 0 : id: NodeId(np.node_id as u64),
214 0 : // At startup we consider a node offline until proven otherwise.
215 0 : availability: NodeAvailability::Offline,
216 0 : scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
217 0 : .expect("Bad scheduling policy in DB"),
218 0 : listen_http_addr: np.listen_http_addr,
219 0 : listen_http_port: np.listen_http_port as u16,
220 0 : listen_pg_addr: np.listen_pg_addr,
221 0 : listen_pg_port: np.listen_pg_port as u16,
222 0 : availability_zone_id: AvailabilityZone(np.availability_zone_id),
223 0 : cancel: CancellationToken::new(),
224 0 : }
225 0 : }
226 :
227 : /// Wrapper for issuing requests to pageserver management API: takes care of generic
228 : /// retry/backoff for retryable HTTP status codes.
229 : ///
230 : /// This will return None to indicate cancellation. Cancellation may happen from
231 : /// the cancellation token passed in, or from Self's cancellation token (i.e. node
232 : /// going offline).
233 0 : pub(crate) async fn with_client_retries<T, O, F>(
234 0 : &self,
235 0 : mut op: O,
236 0 : jwt: &Option<String>,
237 0 : warn_threshold: u32,
238 0 : max_retries: u32,
239 0 : timeout: Duration,
240 0 : cancel: &CancellationToken,
241 0 : ) -> Option<mgmt_api::Result<T>>
242 0 : where
243 0 : O: FnMut(PageserverClient) -> F,
244 0 : F: std::future::Future<Output = mgmt_api::Result<T>>,
245 0 : {
246 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
247 : use mgmt_api::Error::*;
248 0 : match e {
249 0 : SendRequest(_) | ReceiveBody(_) | ReceiveErrorBody(_) => false,
250 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
251 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
252 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
253 0 : ApiError(_, _) => true,
254 0 : Cancelled => true,
255 : }
256 0 : }
257 :
258 0 : backoff::retry(
259 0 : || {
260 0 : let http_client = reqwest::ClientBuilder::new()
261 0 : .timeout(timeout)
262 0 : .build()
263 0 : .expect("Failed to construct HTTP client");
264 0 :
265 0 : let client = PageserverClient::from_client(
266 0 : self.get_id(),
267 0 : http_client,
268 0 : self.base_url(),
269 0 : jwt.as_deref(),
270 0 : );
271 0 :
272 0 : let node_cancel_fut = self.cancel.cancelled();
273 0 :
274 0 : let op_fut = op(client);
275 :
276 0 : async {
277 0 : tokio::select! {
278 0 : r = op_fut=> {r},
279 0 : _ = node_cancel_fut => {
280 0 : Err(mgmt_api::Error::Cancelled)
281 : }}
282 0 : }
283 0 : },
284 0 : is_fatal,
285 0 : warn_threshold,
286 0 : max_retries,
287 0 : &format!(
288 0 : "Call to node {} ({}:{}) management API",
289 0 : self.id, self.listen_http_addr, self.listen_http_port
290 0 : ),
291 0 : cancel,
292 0 : )
293 0 : .await
294 0 : }
295 :
296 : /// Generate the simplified API-friendly description of a node's state
297 0 : pub(crate) fn describe(&self) -> NodeDescribeResponse {
298 0 : NodeDescribeResponse {
299 0 : id: self.id,
300 0 : availability: self.availability.clone().into(),
301 0 : scheduling: self.scheduling,
302 0 : listen_http_addr: self.listen_http_addr.clone(),
303 0 : listen_http_port: self.listen_http_port,
304 0 : listen_pg_addr: self.listen_pg_addr.clone(),
305 0 : listen_pg_port: self.listen_pg_port,
306 0 : }
307 0 : }
308 : }
309 :
310 : impl std::fmt::Display for Node {
311 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
312 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
313 0 : }
314 : }
315 :
316 : impl std::fmt::Debug for Node {
317 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
319 0 : }
320 : }
|