Line data Source code
1 : use std::{str::FromStr, time::Duration};
2 :
3 : use pageserver_api::{
4 : controller_api::{
5 : NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
6 : TenantLocateResponseShard, UtilizationScore,
7 : },
8 : shard::TenantShardId,
9 : };
10 : use pageserver_client::mgmt_api;
11 : use reqwest::StatusCode;
12 : use serde::Serialize;
13 : use tokio_util::sync::CancellationToken;
14 : use utils::{backoff, id::NodeId};
15 :
16 : use crate::{
17 : pageserver_client::PageserverClient, persistence::NodePersistence, scheduler::MaySchedule,
18 : };
19 :
20 : /// Represents the in-memory description of a Node.
21 : ///
22 : /// Scheduling statistics are maintened separately in [`crate::scheduler`].
23 : ///
24 : /// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
25 : /// implementation of serialization on this type is only for debug dumps.
26 : #[derive(Clone, Serialize)]
27 : pub(crate) struct Node {
28 : id: NodeId,
29 :
30 : availability: NodeAvailability,
31 : scheduling: NodeSchedulingPolicy,
32 :
33 : listen_http_addr: String,
34 : listen_http_port: u16,
35 :
36 : listen_pg_addr: String,
37 : listen_pg_port: u16,
38 :
39 : // This cancellation token means "stop any RPCs in flight to this node, and don't start
40 : // any more". It is not related to process shutdown.
41 : #[serde(skip)]
42 : cancel: CancellationToken,
43 : }
44 :
45 : /// When updating [`Node::availability`] we use this type to indicate to the caller
46 : /// whether/how they changed it.
47 : pub(crate) enum AvailabilityTransition {
48 : ToActive,
49 : ToOffline,
50 : Unchanged,
51 : }
52 :
53 : impl Node {
54 0 : pub(crate) fn base_url(&self) -> String {
55 0 : format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
56 0 : }
57 :
58 52 : pub(crate) fn get_id(&self) -> NodeId {
59 52 : self.id
60 52 : }
61 :
62 60 : pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
63 60 : self.scheduling
64 60 : }
65 :
66 0 : pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
67 0 : self.scheduling = scheduling
68 0 : }
69 :
70 : /// Does this registration request match `self`? This is used when deciding whether a registration
71 : /// request should be allowed to update an existing record with the same node ID.
72 0 : pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
73 0 : self.id == register_req.node_id
74 0 : && self.listen_http_addr == register_req.listen_http_addr
75 0 : && self.listen_http_port == register_req.listen_http_port
76 0 : && self.listen_pg_addr == register_req.listen_pg_addr
77 0 : && self.listen_pg_port == register_req.listen_pg_port
78 0 : }
79 :
80 : /// For a shard located on this node, populate a response object
81 : /// with this node's address information.
82 0 : pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard {
83 0 : TenantLocateResponseShard {
84 0 : shard_id,
85 0 : node_id: self.id,
86 0 : listen_http_addr: self.listen_http_addr.clone(),
87 0 : listen_http_port: self.listen_http_port,
88 0 : listen_pg_addr: self.listen_pg_addr.clone(),
89 0 : listen_pg_port: self.listen_pg_port,
90 0 : }
91 0 : }
92 :
93 46 : pub(crate) fn set_availability(&mut self, availability: NodeAvailability) {
94 46 : match self.get_availability_transition(availability) {
95 44 : AvailabilityTransition::ToActive => {
96 44 : // Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
97 44 : // users of previously-cloned copies of the node will still see the old cancellation
98 44 : // state. For example, Reconcilers in flight will have to complete and be spawned
99 44 : // again to realize that the node has become available.
100 44 : self.cancel = CancellationToken::new();
101 44 : }
102 2 : AvailabilityTransition::ToOffline => {
103 2 : // Fire the node's cancellation token to cancel any in-flight API requests to it
104 2 : self.cancel.cancel();
105 2 : }
106 0 : AvailabilityTransition::Unchanged => {}
107 : }
108 46 : self.availability = availability;
109 46 : }
110 :
111 : /// Without modifying the availability of the node, convert the intended availability
112 : /// into a description of the transition.
113 46 : pub(crate) fn get_availability_transition(
114 46 : &self,
115 46 : availability: NodeAvailability,
116 46 : ) -> AvailabilityTransition {
117 46 : use AvailabilityTransition::*;
118 46 : use NodeAvailability::*;
119 46 :
120 46 : match (self.availability, availability) {
121 44 : (Offline, Active(_)) => ToActive,
122 2 : (Active(_), Offline) => ToOffline,
123 : // Consider the case when the storage controller handles the re-attach of a node
124 : // before the heartbeats detect that the node is back online. We still need
125 : // [`Service::node_configure`] to attempt reconciliations for shards with an
126 : // unknown observed location.
127 : // The unsavoury match arm below handles this situation.
128 0 : (Active(lhs), Active(rhs))
129 0 : if lhs == UtilizationScore::worst() && rhs < UtilizationScore::worst() =>
130 0 : {
131 0 : ToActive
132 : }
133 0 : _ => Unchanged,
134 : }
135 46 : }
136 :
137 : /// Whether we may send API requests to this node.
138 44 : pub(crate) fn is_available(&self) -> bool {
139 : // When we clone a node, [`Self::availability`] is a snapshot, but [`Self::cancel`] holds
140 : // a reference to the original Node's cancellation status. Checking both of these results
141 : // in a "pessimistic" check where we will consider a Node instance unavailable if it was unavailable
142 : // when we cloned it, or if the original Node instance's cancellation token was fired.
143 44 : matches!(self.availability, NodeAvailability::Active(_)) && !self.cancel.is_cancelled()
144 44 : }
145 :
146 : /// Is this node elegible to have work scheduled onto it?
147 112 : pub(crate) fn may_schedule(&self) -> MaySchedule {
148 112 : let score = match self.availability {
149 110 : NodeAvailability::Active(score) => score,
150 2 : NodeAvailability::Offline => return MaySchedule::No,
151 : };
152 :
153 110 : match self.scheduling {
154 110 : NodeSchedulingPolicy::Active => MaySchedule::Yes(score),
155 0 : NodeSchedulingPolicy::Draining => MaySchedule::No,
156 0 : NodeSchedulingPolicy::Filling => MaySchedule::Yes(score),
157 0 : NodeSchedulingPolicy::Pause => MaySchedule::No,
158 0 : NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
159 : }
160 112 : }
161 :
162 44 : pub(crate) fn new(
163 44 : id: NodeId,
164 44 : listen_http_addr: String,
165 44 : listen_http_port: u16,
166 44 : listen_pg_addr: String,
167 44 : listen_pg_port: u16,
168 44 : ) -> Self {
169 44 : Self {
170 44 : id,
171 44 : listen_http_addr,
172 44 : listen_http_port,
173 44 : listen_pg_addr,
174 44 : listen_pg_port,
175 44 : scheduling: NodeSchedulingPolicy::Active,
176 44 : availability: NodeAvailability::Offline,
177 44 : cancel: CancellationToken::new(),
178 44 : }
179 44 : }
180 :
181 0 : pub(crate) fn to_persistent(&self) -> NodePersistence {
182 0 : NodePersistence {
183 0 : node_id: self.id.0 as i64,
184 0 : scheduling_policy: self.scheduling.into(),
185 0 : listen_http_addr: self.listen_http_addr.clone(),
186 0 : listen_http_port: self.listen_http_port as i32,
187 0 : listen_pg_addr: self.listen_pg_addr.clone(),
188 0 : listen_pg_port: self.listen_pg_port as i32,
189 0 : }
190 0 : }
191 :
192 0 : pub(crate) fn from_persistent(np: NodePersistence) -> Self {
193 0 : Self {
194 0 : id: NodeId(np.node_id as u64),
195 0 : // At startup we consider a node offline until proven otherwise.
196 0 : availability: NodeAvailability::Offline,
197 0 : scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
198 0 : .expect("Bad scheduling policy in DB"),
199 0 : listen_http_addr: np.listen_http_addr,
200 0 : listen_http_port: np.listen_http_port as u16,
201 0 : listen_pg_addr: np.listen_pg_addr,
202 0 : listen_pg_port: np.listen_pg_port as u16,
203 0 : cancel: CancellationToken::new(),
204 0 : }
205 0 : }
206 :
207 : /// Wrapper for issuing requests to pageserver management API: takes care of generic
208 : /// retry/backoff for retryable HTTP status codes.
209 : ///
210 : /// This will return None to indicate cancellation. Cancellation may happen from
211 : /// the cancellation token passed in, or from Self's cancellation token (i.e. node
212 : /// going offline).
213 0 : pub(crate) async fn with_client_retries<T, O, F>(
214 0 : &self,
215 0 : mut op: O,
216 0 : jwt: &Option<String>,
217 0 : warn_threshold: u32,
218 0 : max_retries: u32,
219 0 : timeout: Duration,
220 0 : cancel: &CancellationToken,
221 0 : ) -> Option<mgmt_api::Result<T>>
222 0 : where
223 0 : O: FnMut(PageserverClient) -> F,
224 0 : F: std::future::Future<Output = mgmt_api::Result<T>>,
225 0 : {
226 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
227 0 : use mgmt_api::Error::*;
228 0 : match e {
229 0 : ReceiveBody(_) | ReceiveErrorBody(_) => false,
230 0 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
231 0 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
232 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
233 0 : ApiError(_, _) => true,
234 0 : Cancelled => true,
235 0 : }
236 0 : }
237 0 :
238 0 : backoff::retry(
239 0 : || {
240 0 : let http_client = reqwest::ClientBuilder::new()
241 0 : .timeout(timeout)
242 0 : .build()
243 0 : .expect("Failed to construct HTTP client");
244 0 :
245 0 : let client = PageserverClient::from_client(
246 0 : self.get_id(),
247 0 : http_client,
248 0 : self.base_url(),
249 0 : jwt.as_deref(),
250 0 : );
251 0 :
252 0 : let node_cancel_fut = self.cancel.cancelled();
253 0 :
254 0 : let op_fut = op(client);
255 :
256 0 : async {
257 : tokio::select! {
258 : r = op_fut=> {r},
259 : _ = node_cancel_fut => {
260 : Err(mgmt_api::Error::Cancelled)
261 : }}
262 0 : }
263 0 : },
264 0 : is_fatal,
265 0 : warn_threshold,
266 0 : max_retries,
267 0 : &format!(
268 0 : "Call to node {} ({}:{}) management API",
269 0 : self.id, self.listen_http_addr, self.listen_http_port
270 0 : ),
271 0 : cancel,
272 0 : )
273 0 : .await
274 0 : }
275 :
276 : /// Generate the simplified API-friendly description of a node's state
277 0 : pub(crate) fn describe(&self) -> NodeDescribeResponse {
278 0 : NodeDescribeResponse {
279 0 : id: self.id,
280 0 : availability: self.availability.into(),
281 0 : scheduling: self.scheduling,
282 0 : listen_http_addr: self.listen_http_addr.clone(),
283 0 : listen_http_port: self.listen_http_port,
284 0 : listen_pg_addr: self.listen_pg_addr.clone(),
285 0 : listen_pg_port: self.listen_pg_port,
286 0 : }
287 0 : }
288 : }
289 :
290 : impl std::fmt::Display for Node {
291 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
293 0 : }
294 : }
295 :
296 : impl std::fmt::Debug for Node {
297 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
299 0 : }
300 : }
|