Line data Source code
1 : use std::collections::HashSet;
2 : use std::str::FromStr;
3 : use std::time::{Duration, Instant};
4 :
5 : /// Request/response types for the storage controller
6 : /// API (`/control/v1` prefix). Implemented by the server
7 : /// in [`storage_controller::http`]
8 : use serde::{Deserialize, Serialize};
9 : use utils::id::{NodeId, TenantId};
10 :
11 : use crate::{
12 : models::{ShardParameters, TenantConfig},
13 : shard::{ShardStripeSize, TenantShardId},
14 : };
15 :
16 6 : #[derive(Serialize, Deserialize, Debug)]
17 : #[serde(deny_unknown_fields)]
18 : pub struct TenantCreateRequest {
19 : pub new_tenant_id: TenantShardId,
20 : #[serde(default)]
21 : #[serde(skip_serializing_if = "Option::is_none")]
22 : pub generation: Option<u32>,
23 :
24 : // If omitted, create a single shard with TenantShardId::unsharded()
25 : #[serde(default)]
26 : #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
27 : pub shard_parameters: ShardParameters,
28 :
29 : #[serde(default)]
30 : #[serde(skip_serializing_if = "Option::is_none")]
31 : pub placement_policy: Option<PlacementPolicy>,
32 :
33 : #[serde(flatten)]
34 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
35 : }
36 :
37 0 : #[derive(Serialize, Deserialize)]
38 : pub struct TenantCreateResponseShard {
39 : pub shard_id: TenantShardId,
40 : pub node_id: NodeId,
41 : pub generation: u32,
42 : }
43 :
44 0 : #[derive(Serialize, Deserialize)]
45 : pub struct TenantCreateResponse {
46 : pub shards: Vec<TenantCreateResponseShard>,
47 : }
48 :
49 0 : #[derive(Serialize, Deserialize)]
50 : pub struct NodeRegisterRequest {
51 : pub node_id: NodeId,
52 :
53 : pub listen_pg_addr: String,
54 : pub listen_pg_port: u16,
55 :
56 : pub listen_http_addr: String,
57 : pub listen_http_port: u16,
58 : }
59 :
60 0 : #[derive(Serialize, Deserialize)]
61 : pub struct NodeConfigureRequest {
62 : pub node_id: NodeId,
63 :
64 : pub availability: Option<NodeAvailabilityWrapper>,
65 : pub scheduling: Option<NodeSchedulingPolicy>,
66 : }
67 :
68 0 : #[derive(Serialize, Deserialize)]
69 : pub struct TenantPolicyRequest {
70 : pub placement: Option<PlacementPolicy>,
71 : pub scheduling: Option<ShardSchedulingPolicy>,
72 : }
73 :
74 0 : #[derive(Serialize, Deserialize, Debug)]
75 : pub struct TenantLocateResponseShard {
76 : pub shard_id: TenantShardId,
77 : pub node_id: NodeId,
78 :
79 : pub listen_pg_addr: String,
80 : pub listen_pg_port: u16,
81 :
82 : pub listen_http_addr: String,
83 : pub listen_http_port: u16,
84 : }
85 :
86 0 : #[derive(Serialize, Deserialize)]
87 : pub struct TenantLocateResponse {
88 : pub shards: Vec<TenantLocateResponseShard>,
89 : pub shard_params: ShardParameters,
90 : }
91 :
92 0 : #[derive(Serialize, Deserialize, Debug)]
93 : pub struct TenantDescribeResponse {
94 : pub tenant_id: TenantId,
95 : pub shards: Vec<TenantDescribeResponseShard>,
96 : pub stripe_size: ShardStripeSize,
97 : pub policy: PlacementPolicy,
98 : pub config: TenantConfig,
99 : }
100 :
101 0 : #[derive(Serialize, Deserialize)]
102 : pub struct NodeDescribeResponse {
103 : pub id: NodeId,
104 :
105 : pub availability: NodeAvailabilityWrapper,
106 : pub scheduling: NodeSchedulingPolicy,
107 :
108 : pub listen_http_addr: String,
109 : pub listen_http_port: u16,
110 :
111 : pub listen_pg_addr: String,
112 : pub listen_pg_port: u16,
113 : }
114 :
115 0 : #[derive(Serialize, Deserialize, Debug)]
116 : pub struct TenantDescribeResponseShard {
117 : pub tenant_shard_id: TenantShardId,
118 :
119 : pub node_attached: Option<NodeId>,
120 : pub node_secondary: Vec<NodeId>,
121 :
122 : pub last_error: String,
123 :
124 : /// A task is currently running to reconcile this tenant's intent state with the state on pageservers
125 : pub is_reconciling: bool,
126 : /// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
127 : pub is_pending_compute_notification: bool,
128 : /// A shard split is currently underway
129 : pub is_splitting: bool,
130 :
131 : pub scheduling_policy: ShardSchedulingPolicy,
132 : }
133 :
134 : /// Explicitly migrating a particular shard is a low level operation
135 : /// TODO: higher level "Reschedule tenant" operation where the request
136 : /// specifies some constraints, e.g. asking it to get off particular node(s)
137 0 : #[derive(Serialize, Deserialize, Debug)]
138 : pub struct TenantShardMigrateRequest {
139 : pub tenant_shard_id: TenantShardId,
140 : pub node_id: NodeId,
141 : }
142 :
143 : /// Utilisation score indicating how good a candidate a pageserver
144 : /// is for scheduling the next tenant. See [`crate::models::PageserverUtilization`].
145 : /// Lower values are better.
146 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Debug)]
147 : pub struct UtilizationScore(pub u64);
148 :
149 : impl UtilizationScore {
150 44 : pub fn worst() -> Self {
151 44 : UtilizationScore(u64::MAX)
152 44 : }
153 : }
154 :
155 : #[derive(Serialize, Clone, Copy, Debug)]
156 : #[serde(into = "NodeAvailabilityWrapper")]
157 : pub enum NodeAvailability {
158 : // Normal, happy state
159 : Active(UtilizationScore),
160 : // Node is warming up, but we expect it to become available soon. Covers
161 : // the time span between the re-attach response being composed on the storage controller
162 : // and the first successful heartbeat after the processing of the re-attach response
163 : // finishes on the pageserver.
164 : WarmingUp(Instant),
165 : // Offline: Tenants shouldn't try to attach here, but they may assume that their
166 : // secondary locations on this node still exist. Newly added nodes are in this
167 : // state until we successfully contact them.
168 : Offline,
169 : }
170 :
171 : impl PartialEq for NodeAvailability {
172 0 : fn eq(&self, other: &Self) -> bool {
173 : use NodeAvailability::*;
174 0 : matches!(
175 0 : (self, other),
176 : (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
177 : )
178 0 : }
179 : }
180 :
181 : impl Eq for NodeAvailability {}
182 :
183 : // This wrapper provides serde functionality and it should only be used to
184 : // communicate with external callers which don't know or care about the
185 : // utilisation score of the pageserver it is targeting.
186 0 : #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
187 : pub enum NodeAvailabilityWrapper {
188 : Active,
189 : WarmingUp,
190 : Offline,
191 : }
192 :
193 : impl From<NodeAvailabilityWrapper> for NodeAvailability {
194 0 : fn from(val: NodeAvailabilityWrapper) -> Self {
195 0 : match val {
196 : // Assume the worst utilisation score to begin with. It will later be updated by
197 : // the heartbeats.
198 0 : NodeAvailabilityWrapper::Active => NodeAvailability::Active(UtilizationScore::worst()),
199 0 : NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
200 0 : NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
201 : }
202 0 : }
203 : }
204 :
205 : impl From<NodeAvailability> for NodeAvailabilityWrapper {
206 0 : fn from(val: NodeAvailability) -> Self {
207 0 : match val {
208 0 : NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
209 0 : NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
210 0 : NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
211 : }
212 0 : }
213 : }
214 :
215 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
216 : pub enum ShardSchedulingPolicy {
217 : // Normal mode: the tenant's scheduled locations may be updated at will, including
218 : // for non-essential optimization.
219 : Active,
220 :
221 : // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
222 : // For example, this still permits a node's attachment location to change to a secondary in
223 : // response to a node failure, or to assign a new secondary if a node was removed.
224 : Essential,
225 :
226 : // No scheduling: leave the shard running wherever it currently is. Even if the shard is
227 : // unavailable, it will not be rescheduled to another node.
228 : Pause,
229 :
230 : // No reconciling: we will make no location_conf API calls to pageservers at all. If the
231 : // shard is unavailable, it stays that way. If a node fails, this shard doesn't get failed over.
232 : Stop,
233 : }
234 :
235 : impl Default for ShardSchedulingPolicy {
236 22 : fn default() -> Self {
237 22 : Self::Active
238 22 : }
239 : }
240 :
241 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
242 : pub enum NodeSchedulingPolicy {
243 : Active,
244 : Filling,
245 : Pause,
246 : PauseForRestart,
247 : Draining,
248 : }
249 :
250 : impl FromStr for NodeSchedulingPolicy {
251 : type Err = anyhow::Error;
252 :
253 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
254 0 : match s {
255 0 : "active" => Ok(Self::Active),
256 0 : "filling" => Ok(Self::Filling),
257 0 : "pause" => Ok(Self::Pause),
258 0 : "pause_for_restart" => Ok(Self::PauseForRestart),
259 0 : "draining" => Ok(Self::Draining),
260 0 : _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
261 : }
262 0 : }
263 : }
264 :
265 : impl From<NodeSchedulingPolicy> for String {
266 0 : fn from(value: NodeSchedulingPolicy) -> String {
267 0 : use NodeSchedulingPolicy::*;
268 0 : match value {
269 0 : Active => "active",
270 0 : Filling => "filling",
271 0 : Pause => "pause",
272 0 : PauseForRestart => "pause_for_restart",
273 0 : Draining => "draining",
274 : }
275 0 : .to_string()
276 0 : }
277 : }
278 :
279 : /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
280 : /// to create secondary locations.
281 8 : #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
282 : pub enum PlacementPolicy {
283 : /// Normal live state: one attached pageserver and zero or more secondaries.
284 : Attached(usize),
285 : /// Create one secondary mode locations. This is useful when onboarding
286 : /// a tenant, or for an idle tenant that we might want to bring online quickly.
287 : Secondary,
288 :
289 : /// Do not attach to any pageservers. This is appropriate for tenants that
290 : /// have been idle for a long time, where we do not mind some delay in making
291 : /// them available in future.
292 : Detached,
293 : }
294 :
295 0 : #[derive(Serialize, Deserialize, Debug)]
296 : pub struct TenantShardMigrateResponse {}
297 :
298 : /// Metadata health record posted from scrubber.
299 0 : #[derive(Serialize, Deserialize, Debug)]
300 : pub struct MetadataHealthRecord {
301 : pub tenant_shard_id: TenantShardId,
302 : pub healthy: bool,
303 : pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
304 : }
305 :
306 0 : #[derive(Serialize, Deserialize, Debug)]
307 : pub struct MetadataHealthUpdateRequest {
308 : pub healthy_tenant_shards: HashSet<TenantShardId>,
309 : pub unhealthy_tenant_shards: HashSet<TenantShardId>,
310 : }
311 :
312 0 : #[derive(Serialize, Deserialize, Debug)]
313 : pub struct MetadataHealthUpdateResponse {}
314 :
315 0 : #[derive(Serialize, Deserialize, Debug)]
316 :
317 : pub struct MetadataHealthListUnhealthyResponse {
318 : pub unhealthy_tenant_shards: Vec<TenantShardId>,
319 : }
320 :
321 0 : #[derive(Serialize, Deserialize, Debug)]
322 :
323 : pub struct MetadataHealthListOutdatedRequest {
324 : #[serde(with = "humantime_serde")]
325 : pub not_scrubbed_for: Duration,
326 : }
327 :
328 0 : #[derive(Serialize, Deserialize, Debug)]
329 :
330 : pub struct MetadataHealthListOutdatedResponse {
331 : pub health_records: Vec<MetadataHealthRecord>,
332 : }
333 :
334 : #[cfg(test)]
335 : mod test {
336 : use super::*;
337 : use serde_json;
338 :
339 : /// Check stability of PlacementPolicy's serialization
340 : #[test]
341 2 : fn placement_policy_encoding() -> anyhow::Result<()> {
342 2 : let v = PlacementPolicy::Attached(1);
343 2 : let encoded = serde_json::to_string(&v)?;
344 2 : assert_eq!(encoded, "{\"Attached\":1}");
345 2 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
346 :
347 2 : let v = PlacementPolicy::Detached;
348 2 : let encoded = serde_json::to_string(&v)?;
349 2 : assert_eq!(encoded, "\"Detached\"");
350 2 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
351 2 : Ok(())
352 2 : }
353 :
354 : #[test]
355 2 : fn test_reject_unknown_field() {
356 2 : let id = TenantId::generate();
357 2 : let create_request = serde_json::json!({
358 2 : "new_tenant_id": id.to_string(),
359 2 : "unknown_field": "unknown_value".to_string(),
360 2 : });
361 2 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
362 2 : assert!(
363 2 : err.to_string().contains("unknown field `unknown_field`"),
364 0 : "expect unknown field `unknown_field` error, got: {}",
365 : err
366 : );
367 2 : }
368 : }
|