Line data Source code
1 : use std::{str::FromStr, time::Duration};
2 :
3 : use pageserver_api::{
4 : controller_api::{
5 : NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
6 : TenantLocateResponseShard,
7 : },
8 : shard::TenantShardId,
9 : };
10 : use pageserver_client::mgmt_api;
11 : use reqwest::StatusCode;
12 : use serde::Serialize;
13 : use tokio_util::sync::CancellationToken;
14 : use utils::{backoff, id::NodeId};
15 :
16 : use crate::{
17 : pageserver_client::PageserverClient, persistence::NodePersistence, scheduler::MaySchedule,
18 : };
19 :
20 : /// Represents the in-memory description of a Node.
21 : ///
22 : /// Scheduling statistics are maintened separately in [`crate::scheduler`].
23 : ///
24 : /// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
25 : /// implementation of serialization on this type is only for debug dumps.
26 : #[derive(Clone, Serialize)]
27 : pub(crate) struct Node {
28 : id: NodeId,
29 :
30 : availability: NodeAvailability,
31 : scheduling: NodeSchedulingPolicy,
32 :
33 : listen_http_addr: String,
34 : listen_http_port: u16,
35 :
36 : listen_pg_addr: String,
37 : listen_pg_port: u16,
38 :
39 : // This cancellation token means "stop any RPCs in flight to this node, and don't start
40 : // any more". It is not related to process shutdown.
41 : #[serde(skip)]
42 : cancel: CancellationToken,
43 : }
44 :
45 : /// When updating [`Node::availability`] we use this type to indicate to the caller
46 : /// whether/how they changed it.
47 : pub(crate) enum AvailabilityTransition {
48 : ToActive,
49 : ToOffline,
50 : Unchanged,
51 : }
52 :
53 : impl Node {
54 0 : pub(crate) fn base_url(&self) -> String {
55 0 : format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
56 0 : }
57 :
58 52 : pub(crate) fn get_id(&self) -> NodeId {
59 52 : self.id
60 52 : }
61 :
62 0 : pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
63 0 : self.scheduling = scheduling
64 0 : }
65 :
66 : /// Does this registration request match `self`? This is used when deciding whether a registration
67 : /// request should be allowed to update an existing record with the same node ID.
68 0 : pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
69 0 : self.id == register_req.node_id
70 0 : && self.listen_http_addr == register_req.listen_http_addr
71 0 : && self.listen_http_port == register_req.listen_http_port
72 0 : && self.listen_pg_addr == register_req.listen_pg_addr
73 0 : && self.listen_pg_port == register_req.listen_pg_port
74 0 : }
75 :
76 : /// For a shard located on this node, populate a response object
77 : /// with this node's address information.
78 0 : pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard {
79 0 : TenantLocateResponseShard {
80 0 : shard_id,
81 0 : node_id: self.id,
82 0 : listen_http_addr: self.listen_http_addr.clone(),
83 0 : listen_http_port: self.listen_http_port,
84 0 : listen_pg_addr: self.listen_pg_addr.clone(),
85 0 : listen_pg_port: self.listen_pg_port,
86 0 : }
87 0 : }
88 :
89 46 : pub(crate) fn set_availability(&mut self, availability: NodeAvailability) {
90 46 : match self.get_availability_transition(availability) {
91 44 : AvailabilityTransition::ToActive => {
92 44 : // Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
93 44 : // users of previously-cloned copies of the node will still see the old cancellation
94 44 : // state. For example, Reconcilers in flight will have to complete and be spawned
95 44 : // again to realize that the node has become available.
96 44 : self.cancel = CancellationToken::new();
97 44 : }
98 2 : AvailabilityTransition::ToOffline => {
99 2 : // Fire the node's cancellation token to cancel any in-flight API requests to it
100 2 : self.cancel.cancel();
101 2 : }
102 0 : AvailabilityTransition::Unchanged => {}
103 : }
104 46 : self.availability = availability;
105 46 : }
106 :
107 : /// Without modifying the availability of the node, convert the intended availability
108 : /// into a description of the transition.
109 46 : pub(crate) fn get_availability_transition(
110 46 : &self,
111 46 : availability: NodeAvailability,
112 46 : ) -> AvailabilityTransition {
113 46 : use AvailabilityTransition::*;
114 46 : use NodeAvailability::*;
115 46 :
116 46 : match (self.availability, availability) {
117 44 : (Offline, Active(_)) => ToActive,
118 2 : (Active(_), Offline) => ToOffline,
119 0 : _ => Unchanged,
120 : }
121 46 : }
122 :
123 : /// Whether we may send API requests to this node.
124 44 : pub(crate) fn is_available(&self) -> bool {
125 : // When we clone a node, [`Self::availability`] is a snapshot, but [`Self::cancel`] holds
126 : // a reference to the original Node's cancellation status. Checking both of these results
127 : // in a "pessimistic" check where we will consider a Node instance unavailable if it was unavailable
128 : // when we cloned it, or if the original Node instance's cancellation token was fired.
129 44 : matches!(self.availability, NodeAvailability::Active(_)) && !self.cancel.is_cancelled()
130 44 : }
131 :
132 : /// Is this node elegible to have work scheduled onto it?
133 132 : pub(crate) fn may_schedule(&self) -> MaySchedule {
134 132 : let score = match self.availability {
135 130 : NodeAvailability::Active(score) => score,
136 2 : NodeAvailability::Offline => return MaySchedule::No,
137 : };
138 :
139 130 : match self.scheduling {
140 0 : NodeSchedulingPolicy::Active => MaySchedule::Yes(score),
141 0 : NodeSchedulingPolicy::Draining => MaySchedule::No,
142 130 : NodeSchedulingPolicy::Filling => MaySchedule::Yes(score),
143 0 : NodeSchedulingPolicy::Pause => MaySchedule::No,
144 : }
145 132 : }
146 :
147 44 : pub(crate) fn new(
148 44 : id: NodeId,
149 44 : listen_http_addr: String,
150 44 : listen_http_port: u16,
151 44 : listen_pg_addr: String,
152 44 : listen_pg_port: u16,
153 44 : ) -> Self {
154 44 : Self {
155 44 : id,
156 44 : listen_http_addr,
157 44 : listen_http_port,
158 44 : listen_pg_addr,
159 44 : listen_pg_port,
160 44 : scheduling: NodeSchedulingPolicy::Filling,
161 44 : availability: NodeAvailability::Offline,
162 44 : cancel: CancellationToken::new(),
163 44 : }
164 44 : }
165 :
166 0 : pub(crate) fn to_persistent(&self) -> NodePersistence {
167 0 : NodePersistence {
168 0 : node_id: self.id.0 as i64,
169 0 : scheduling_policy: self.scheduling.into(),
170 0 : listen_http_addr: self.listen_http_addr.clone(),
171 0 : listen_http_port: self.listen_http_port as i32,
172 0 : listen_pg_addr: self.listen_pg_addr.clone(),
173 0 : listen_pg_port: self.listen_pg_port as i32,
174 0 : }
175 0 : }
176 :
177 0 : pub(crate) fn from_persistent(np: NodePersistence) -> Self {
178 0 : Self {
179 0 : id: NodeId(np.node_id as u64),
180 0 : // At startup we consider a node offline until proven otherwise.
181 0 : availability: NodeAvailability::Offline,
182 0 : scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
183 0 : .expect("Bad scheduling policy in DB"),
184 0 : listen_http_addr: np.listen_http_addr,
185 0 : listen_http_port: np.listen_http_port as u16,
186 0 : listen_pg_addr: np.listen_pg_addr,
187 0 : listen_pg_port: np.listen_pg_port as u16,
188 0 : cancel: CancellationToken::new(),
189 0 : }
190 0 : }
191 :
192 : /// Wrapper for issuing requests to pageserver management API: takes care of generic
193 : /// retry/backoff for retryable HTTP status codes.
194 : ///
195 : /// This will return None to indicate cancellation. Cancellation may happen from
196 : /// the cancellation token passed in, or from Self's cancellation token (i.e. node
197 : /// going offline).
198 0 : pub(crate) async fn with_client_retries<T, O, F>(
199 0 : &self,
200 0 : mut op: O,
201 0 : jwt: &Option<String>,
202 0 : warn_threshold: u32,
203 0 : max_retries: u32,
204 0 : timeout: Duration,
205 0 : cancel: &CancellationToken,
206 0 : ) -> Option<mgmt_api::Result<T>>
207 0 : where
208 0 : O: FnMut(PageserverClient) -> F,
209 0 : F: std::future::Future<Output = mgmt_api::Result<T>>,
210 0 : {
211 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
212 0 : use mgmt_api::Error::*;
213 0 : match e {
214 0 : ReceiveBody(_) | ReceiveErrorBody(_) => false,
215 0 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
216 0 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
217 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
218 0 : ApiError(_, _) => true,
219 0 : Cancelled => true,
220 0 : }
221 0 : }
222 0 :
223 0 : backoff::retry(
224 0 : || {
225 0 : let http_client = reqwest::ClientBuilder::new()
226 0 : .timeout(timeout)
227 0 : .build()
228 0 : .expect("Failed to construct HTTP client");
229 0 :
230 0 : let client = PageserverClient::from_client(
231 0 : self.get_id(),
232 0 : http_client,
233 0 : self.base_url(),
234 0 : jwt.as_deref(),
235 0 : );
236 0 :
237 0 : let node_cancel_fut = self.cancel.cancelled();
238 0 :
239 0 : let op_fut = op(client);
240 :
241 0 : async {
242 : tokio::select! {
243 : r = op_fut=> {r},
244 : _ = node_cancel_fut => {
245 : Err(mgmt_api::Error::Cancelled)
246 : }}
247 0 : }
248 0 : },
249 0 : is_fatal,
250 0 : warn_threshold,
251 0 : max_retries,
252 0 : &format!(
253 0 : "Call to node {} ({}:{}) management API",
254 0 : self.id, self.listen_http_addr, self.listen_http_port
255 0 : ),
256 0 : cancel,
257 0 : )
258 0 : .await
259 0 : }
260 :
261 : /// Generate the simplified API-friendly description of a node's state
262 0 : pub(crate) fn describe(&self) -> NodeDescribeResponse {
263 0 : NodeDescribeResponse {
264 0 : id: self.id,
265 0 : availability: self.availability.into(),
266 0 : scheduling: self.scheduling,
267 0 : listen_http_addr: self.listen_http_addr.clone(),
268 0 : listen_http_port: self.listen_http_port,
269 0 : listen_pg_addr: self.listen_pg_addr.clone(),
270 0 : listen_pg_port: self.listen_pg_port,
271 0 : }
272 0 : }
273 : }
274 :
275 : impl std::fmt::Display for Node {
276 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
278 0 : }
279 : }
280 :
281 : impl std::fmt::Debug for Node {
282 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283 0 : write!(f, "{} ({})", self.id, self.listen_http_addr)
284 0 : }
285 : }
|