Line data Source code
1 : use std::{str::FromStr, time::Duration};
2 :
3 : use pageserver_api::{
4 : controller_api::{
5 : NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
6 : TenantLocateResponseShard,
7 : },
8 : shard::TenantShardId,
9 : };
10 : use pageserver_client::mgmt_api;
11 : use reqwest::StatusCode;
12 : use serde::Serialize;
13 : use tokio_util::sync::CancellationToken;
14 : use utils::{backoff, id::NodeId};
15 :
16 : use crate::{
17 : pageserver_client::PageserverClient, persistence::NodePersistence, scheduler::MaySchedule,
18 : };
19 :
20 : /// Represents the in-memory description of a Node.
21 : ///
22 : /// Scheduling statistics are maintened separately in [`crate::scheduler`].
23 : ///
24 : /// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
25 : /// implementation of serialization on this type is only for debug dumps.
26 : #[derive(Clone, Serialize)]
27 : pub(crate) struct Node {
28 : id: NodeId,
29 :
30 : availability: NodeAvailability,
31 : scheduling: NodeSchedulingPolicy,
32 :
33 : listen_http_addr: String,
34 : listen_http_port: u16,
35 :
36 : listen_pg_addr: String,
37 : listen_pg_port: u16,
38 :
39 : // This cancellation token means "stop any RPCs in flight to this node, and don't start
40 : // any more". It is not related to process shutdown.
41 : #[serde(skip)]
42 : cancel: CancellationToken,
43 : }
44 :
45 : /// When updating [`Node::availability`] we use this type to indicate to the caller
46 : /// whether/how they changed it.
47 : pub(crate) enum AvailabilityTransition {
48 : ToActive,
49 : ToWarmingUpFromActive,
50 : ToWarmingUpFromOffline,
51 : ToOffline,
52 : Unchanged,
53 : }
54 :
55 : impl Node {
56 0 : pub(crate) fn base_url(&self) -> String {
57 0 : format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
58 0 : }
59 :
60 52 : pub(crate) fn get_id(&self) -> NodeId {
61 52 : self.id
62 52 : }
63 :
64 60 : pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
65 60 : self.scheduling
66 60 : }
67 :
68 0 : pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
69 0 : self.scheduling = scheduling
70 0 : }
71 :
72 : /// Does this registration request match `self`? This is used when deciding whether a registration
73 : /// request should be allowed to update an existing record with the same node ID.
74 0 : pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
75 0 : self.id == register_req.node_id
76 0 : && self.listen_http_addr == register_req.listen_http_addr
77 0 : && self.listen_http_port == register_req.listen_http_port
78 0 : && self.listen_pg_addr == register_req.listen_pg_addr
79 0 : && self.listen_pg_port == register_req.listen_pg_port
80 0 : }
81 :
82 : /// For a shard located on this node, populate a response object
83 : /// with this node's address information.
84 0 : pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard {
85 0 : TenantLocateResponseShard {
86 0 : shard_id,
87 0 : node_id: self.id,
88 0 : listen_http_addr: self.listen_http_addr.clone(),
89 0 : listen_http_port: self.listen_http_port,
90 0 : listen_pg_addr: self.listen_pg_addr.clone(),
91 0 : listen_pg_port: self.listen_pg_port,
92 0 : }
93 0 : }
94 :
95 0 : pub(crate) fn get_availability(&self) -> NodeAvailability {
96 0 : self.availability
97 0 : }
98 :
99 46 : pub(crate) fn set_availability(&mut self, availability: NodeAvailability) {
100 46 : use AvailabilityTransition::*;
101 46 : use NodeAvailability::WarmingUp;
102 46 :
103 46 : match self.get_availability_transition(availability) {
104 44 : ToActive => {
105 44 : // Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
106 44 : // users of previously-cloned copies of the node will still see the old cancellation
107 44 : // state. For example, Reconcilers in flight will have to complete and be spawned
108 44 : // again to realize that the node has become available.
109 44 : self.cancel = CancellationToken::new();
110 44 : }
111 2 : ToOffline | ToWarmingUpFromActive => {
112 2 : // Fire the node's cancellation token to cancel any in-flight API requests to it
113 2 : self.cancel.cancel();
114 2 : }
115 0 : Unchanged | ToWarmingUpFromOffline => {}
116 : }
117 :
118 46 : if let (WarmingUp(crnt), WarmingUp(proposed)) = (self.availability, availability) {
119 0 : self.availability = WarmingUp(std::cmp::max(crnt, proposed));
120 46 : } else {
121 46 : self.availability = availability;
122 46 : }
123 46 : }
124 :
125 : /// Without modifying the availability of the node, convert the intended availability
126 : /// into a description of the transition.
127 46 : pub(crate) fn get_availability_transition(
128 46 : &self,
129 46 : availability: NodeAvailability,
130 46 : ) -> AvailabilityTransition {
131 46 : use AvailabilityTransition::*;
132 46 : use NodeAvailability::*;
133 46 :
134 46 : match (self.availability, availability) {
135 44 : (Offline, Active(_)) => ToActive,
136 2 : (Active(_), Offline) => ToOffline,
137 0 : (Active(_), WarmingUp(_)) => ToWarmingUpFromActive,
138 0 : (WarmingUp(_), Offline) => ToOffline,
139 0 : (WarmingUp(_), Active(_)) => ToActive,
140 0 : (Offline, WarmingUp(_)) => ToWarmingUpFromOffline,
141 0 : _ => Unchanged,
142 : }
143 46 : }
144 :
145 : /// Whether we may send API requests to this node.
146 44 : pub(crate) fn is_available(&self) -> bool {
147 : // When we clone a node, [`Self::availability`] is a snapshot, but [`Self::cancel`] holds
148 : // a reference to the original Node's cancellation status. Checking both of these results
149 : // in a "pessimistic" check where we will consider a Node instance unavailable if it was unavailable
150 : // when we cloned it, or if the original Node instance's cancellation token was fired.
151 44 : matches!(self.availability, NodeAvailability::Active(_)) && !self.cancel.is_cancelled()
152 44 : }
153 :
154 : /// Is this node elegible to have work scheduled onto it?
155 112 : pub(crate) fn may_schedule(&self) -> MaySchedule {
156 112 : let score = match self.availability {
157 110 : NodeAvailability::Active(score) => score,
158 2 : NodeAvailability::Offline | NodeAvailability::WarmingUp(_) => return MaySchedule::No,
159 : };
160 :
161 110 : match self.scheduling {
162 110 : NodeSchedulingPolicy::Active => MaySchedule::Yes(score),
163 0 : NodeSchedulingPolicy::Draining => MaySchedule::No,
164 0 : NodeSchedulingPolicy::Filling => MaySchedule::Yes(score),
165 0 : NodeSchedulingPolicy::Pause => MaySchedule::No,
166 0 : NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
167 : }
168 112 : }
169 :
170 44 : pub(crate) fn new(
171 44 : id: NodeId,
172 44 : listen_http_addr: String,
173 44 : listen_http_port: u16,
174 44 : listen_pg_addr: String,
175 44 : listen_pg_port: u16,
176 44 : ) -> Self {
177 44 : Self {
178 44 : id,
179 44 : listen_http_addr,
180 44 : listen_http_port,
181 44 : listen_pg_addr,
182 44 : listen_pg_port,
183 44 : scheduling: NodeSchedulingPolicy::Active,
184 44 : availability: NodeAvailability::Offline,
185 44 : cancel: CancellationToken::new(),
186 44 : }
187 44 : }
188 :
189 0 : pub(crate) fn to_persistent(&self) -> NodePersistence {
190 0 : NodePersistence {
191 0 : node_id: self.id.0 as i64,
192 0 : scheduling_policy: self.scheduling.into(),
193 0 : listen_http_addr: self.listen_http_addr.clone(),
194 0 : listen_http_port: self.listen_http_port as i32,
195 0 : listen_pg_addr: self.listen_pg_addr.clone(),
196 0 : listen_pg_port: self.listen_pg_port as i32,
197 0 : }
198 0 : }
199 :
200 0 : pub(crate) fn from_persistent(np: NodePersistence) -> Self {
201 0 : Self {
202 0 : id: NodeId(np.node_id as u64),
203 0 : // At startup we consider a node offline until proven otherwise.
204 0 : availability: NodeAvailability::Offline,
205 0 : scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
206 0 : .expect("Bad scheduling policy in DB"),
207 0 : listen_http_addr: np.listen_http_addr,
208 0 : listen_http_port: np.listen_http_port as u16,
209 0 : listen_pg_addr: np.listen_pg_addr,
210 0 : listen_pg_port: np.listen_pg_port as u16,
211 0 : cancel: CancellationToken::new(),
212 0 : }
213 0 : }
214 :
215 : /// Wrapper for issuing requests to pageserver management API: takes care of generic
216 : /// retry/backoff for retryable HTTP status codes.
217 : ///
218 : /// This will return None to indicate cancellation. Cancellation may happen from
219 : /// the cancellation token passed in, or from Self's cancellation token (i.e. node
220 : /// going offline).
221 0 : pub(crate) async fn with_client_retries<T, O, F>(
222 0 : &self,
223 0 : mut op: O,
224 0 : jwt: &Option<String>,
225 0 : warn_threshold: u32,
226 0 : max_retries: u32,
227 0 : timeout: Duration,
228 0 : cancel: &CancellationToken,
229 0 : ) -> Option<mgmt_api::Result<T>>
230 0 : where
231 0 : O: FnMut(PageserverClient) -> F,
232 0 : F: std::future::Future<Output = mgmt_api::Result<T>>,
233 0 : {
234 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
235 0 : use mgmt_api::Error::*;
236 0 : match e {
237 0 : SendRequest(_) | ReceiveBody(_) | ReceiveErrorBody(_) => false,
238 0 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
239 0 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
240 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
241 0 : ApiError(_, _) => true,
242 0 : Cancelled => true,
243 0 : }
244 0 : }
245 0 :
246 0 : backoff::retry(
247 0 : || {
248 0 : let http_client = reqwest::ClientBuilder::new()
249 0 : .timeout(timeout)
250 0 : .build()
251 0 : .expect("Failed to construct HTTP client");
252 0 :
253 0 : let client = PageserverClient::from_client(
254 0 : self.get_id(),
255 0 : http_client,
256 0 : self.base_url(),
257 0 : jwt.as_deref(),
258 0 : );
259 0 :
260 0 : let node_cancel_fut = self.cancel.cancelled();
261 0 :
262 0 : let op_fut = op(client);
263 :
264 0 : async {
265 : tokio::select! {
266 : r = op_fut=> {r},
267 : _ = node_cancel_fut => {
268 : Err(mgmt_api::Error::Cancelled)
269 : }}
270 0 : }
271 0 : },
272 0 : is_fatal,
273 0 : warn_threshold,
274 0 : max_retries,
275 0 : &format!(
276 0 : "Call to node {} ({}:{}) management API",
277 0 : self.id, self.listen_http_addr, self.listen_http_port
278 0 : ),
279 0 : cancel,
280 0 : )
281 0 : .await
282 0 : }
283 :
284 : /// Generate the simplified API-friendly description of a node's state
285 0 : pub(crate) fn describe(&self) -> NodeDescribeResponse {
286 0 : NodeDescribeResponse {
287 0 : id: self.id,
288 0 : availability: self.availability.into(),
289 0 : scheduling: self.scheduling,
290 0 : listen_http_addr: self.listen_http_addr.clone(),
291 0 : listen_http_port: self.listen_http_port,
292 0 : listen_pg_addr: self.listen_pg_addr.clone(),
293 0 : listen_pg_port: self.listen_pg_port,
294 0 : }
295 0 : }
296 : }
297 :
298 : impl std::fmt::Display for Node {
299 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
301 0 : }
302 : }
303 :
304 : impl std::fmt::Debug for Node {
305 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
306 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
307 0 : }
308 : }
|