Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::str::FromStr;
3 : use std::time::{Duration, Instant};
4 :
5 : /// Request/response types for the storage controller
6 : /// API (`/control/v1` prefix). Implemented by the server
7 : /// in [`storage_controller::http`]
8 : use serde::{Deserialize, Serialize};
9 : use utils::id::{NodeId, TenantId};
10 :
11 : use crate::models::PageserverUtilization;
12 : use crate::{
13 : models::{ShardParameters, TenantConfig},
14 : shard::{ShardStripeSize, TenantShardId},
15 : };
16 :
17 3 : #[derive(Serialize, Deserialize, Debug)]
18 : #[serde(deny_unknown_fields)]
19 : pub struct TenantCreateRequest {
20 : pub new_tenant_id: TenantShardId,
21 : #[serde(default)]
22 : #[serde(skip_serializing_if = "Option::is_none")]
23 : pub generation: Option<u32>,
24 :
25 : // If omitted, create a single shard with TenantShardId::unsharded()
26 : #[serde(default)]
27 : #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
28 : pub shard_parameters: ShardParameters,
29 :
30 : #[serde(default)]
31 : #[serde(skip_serializing_if = "Option::is_none")]
32 : pub placement_policy: Option<PlacementPolicy>,
33 :
34 : #[serde(flatten)]
35 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
36 : }
37 :
38 0 : #[derive(Serialize, Deserialize)]
39 : pub struct TenantCreateResponseShard {
40 : pub shard_id: TenantShardId,
41 : pub node_id: NodeId,
42 : pub generation: u32,
43 : }
44 :
45 0 : #[derive(Serialize, Deserialize)]
46 : pub struct TenantCreateResponse {
47 : pub shards: Vec<TenantCreateResponseShard>,
48 : }
49 :
50 0 : #[derive(Serialize, Deserialize)]
51 : pub struct NodeRegisterRequest {
52 : pub node_id: NodeId,
53 :
54 : pub listen_pg_addr: String,
55 : pub listen_pg_port: u16,
56 :
57 : pub listen_http_addr: String,
58 : pub listen_http_port: u16,
59 :
60 : pub availability_zone_id: String,
61 : }
62 :
63 0 : #[derive(Serialize, Deserialize)]
64 : pub struct NodeConfigureRequest {
65 : pub node_id: NodeId,
66 :
67 : pub availability: Option<NodeAvailabilityWrapper>,
68 : pub scheduling: Option<NodeSchedulingPolicy>,
69 : }
70 :
71 0 : #[derive(Serialize, Deserialize)]
72 : pub struct TenantPolicyRequest {
73 : pub placement: Option<PlacementPolicy>,
74 : pub scheduling: Option<ShardSchedulingPolicy>,
75 : }
76 :
77 0 : #[derive(Serialize, Deserialize)]
78 : pub struct ShardsPreferredAzsRequest {
79 : #[serde(flatten)]
80 : pub preferred_az_ids: HashMap<TenantShardId, String>,
81 : }
82 :
83 0 : #[derive(Serialize, Deserialize)]
84 : pub struct ShardsPreferredAzsResponse {
85 : pub updated: Vec<TenantShardId>,
86 : }
87 :
88 0 : #[derive(Serialize, Deserialize, Debug)]
89 : pub struct TenantLocateResponseShard {
90 : pub shard_id: TenantShardId,
91 : pub node_id: NodeId,
92 :
93 : pub listen_pg_addr: String,
94 : pub listen_pg_port: u16,
95 :
96 : pub listen_http_addr: String,
97 : pub listen_http_port: u16,
98 : }
99 :
100 0 : #[derive(Serialize, Deserialize)]
101 : pub struct TenantLocateResponse {
102 : pub shards: Vec<TenantLocateResponseShard>,
103 : pub shard_params: ShardParameters,
104 : }
105 :
106 0 : #[derive(Serialize, Deserialize, Debug)]
107 : pub struct TenantDescribeResponse {
108 : pub tenant_id: TenantId,
109 : pub shards: Vec<TenantDescribeResponseShard>,
110 : pub stripe_size: ShardStripeSize,
111 : pub policy: PlacementPolicy,
112 : pub config: TenantConfig,
113 : }
114 :
115 0 : #[derive(Serialize, Deserialize)]
116 : pub struct NodeDescribeResponse {
117 : pub id: NodeId,
118 :
119 : pub availability: NodeAvailabilityWrapper,
120 : pub scheduling: NodeSchedulingPolicy,
121 :
122 : pub listen_http_addr: String,
123 : pub listen_http_port: u16,
124 :
125 : pub listen_pg_addr: String,
126 : pub listen_pg_port: u16,
127 : }
128 :
129 0 : #[derive(Serialize, Deserialize, Debug)]
130 : pub struct TenantDescribeResponseShard {
131 : pub tenant_shard_id: TenantShardId,
132 :
133 : pub node_attached: Option<NodeId>,
134 : pub node_secondary: Vec<NodeId>,
135 :
136 : pub last_error: String,
137 :
138 : /// A task is currently running to reconcile this tenant's intent state with the state on pageservers
139 : pub is_reconciling: bool,
140 : /// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
141 : pub is_pending_compute_notification: bool,
142 : /// A shard split is currently underway
143 : pub is_splitting: bool,
144 :
145 : pub scheduling_policy: ShardSchedulingPolicy,
146 :
147 : pub preferred_az_id: Option<String>,
148 : }
149 :
150 : /// Migration request for a given tenant shard to a given node.
151 : ///
152 : /// Explicitly migrating a particular shard is a low level operation
153 : /// TODO: higher level "Reschedule tenant" operation where the request
154 : /// specifies some constraints, e.g. asking it to get off particular node(s)
155 0 : #[derive(Serialize, Deserialize, Debug)]
156 : pub struct TenantShardMigrateRequest {
157 : pub tenant_shard_id: TenantShardId,
158 : pub node_id: NodeId,
159 : }
160 :
161 : #[derive(Serialize, Clone, Debug)]
162 : #[serde(into = "NodeAvailabilityWrapper")]
163 : pub enum NodeAvailability {
164 : // Normal, happy state
165 : Active(PageserverUtilization),
166 : // Node is warming up, but we expect it to become available soon. Covers
167 : // the time span between the re-attach response being composed on the storage controller
168 : // and the first successful heartbeat after the processing of the re-attach response
169 : // finishes on the pageserver.
170 : WarmingUp(Instant),
171 : // Offline: Tenants shouldn't try to attach here, but they may assume that their
172 : // secondary locations on this node still exist. Newly added nodes are in this
173 : // state until we successfully contact them.
174 : Offline,
175 : }
176 :
177 : impl PartialEq for NodeAvailability {
178 0 : fn eq(&self, other: &Self) -> bool {
179 : use NodeAvailability::*;
180 0 : matches!(
181 0 : (self, other),
182 : (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
183 : )
184 0 : }
185 : }
186 :
187 : impl Eq for NodeAvailability {}
188 :
189 : // This wrapper provides serde functionality and it should only be used to
190 : // communicate with external callers which don't know or care about the
191 : // utilisation score of the pageserver it is targeting.
192 0 : #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
193 : pub enum NodeAvailabilityWrapper {
194 : Active,
195 : WarmingUp,
196 : Offline,
197 : }
198 :
199 : impl From<NodeAvailabilityWrapper> for NodeAvailability {
200 0 : fn from(val: NodeAvailabilityWrapper) -> Self {
201 0 : match val {
202 : // Assume the worst utilisation score to begin with. It will later be updated by
203 : // the heartbeats.
204 : NodeAvailabilityWrapper::Active => {
205 0 : NodeAvailability::Active(PageserverUtilization::full())
206 : }
207 0 : NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
208 0 : NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
209 : }
210 0 : }
211 : }
212 :
213 : impl From<NodeAvailability> for NodeAvailabilityWrapper {
214 0 : fn from(val: NodeAvailability) -> Self {
215 0 : match val {
216 0 : NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
217 0 : NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
218 0 : NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
219 : }
220 0 : }
221 : }
222 :
223 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
224 : pub enum ShardSchedulingPolicy {
225 : // Normal mode: the tenant's scheduled locations may be updated at will, including
226 : // for non-essential optimization.
227 : Active,
228 :
229 : // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
230 : // For example, this still permits a node's attachment location to change to a secondary in
231 : // response to a node failure, or to assign a new secondary if a node was removed.
232 : Essential,
233 :
234 : // No scheduling: leave the shard running wherever it currently is. Even if the shard is
235 : // unavailable, it will not be rescheduled to another node.
236 : Pause,
237 :
238 : // No reconciling: we will make no location_conf API calls to pageservers at all. If the
239 : // shard is unavailable, it stays that way. If a node fails, this shard doesn't get failed over.
240 : Stop,
241 : }
242 :
243 : impl Default for ShardSchedulingPolicy {
244 11 : fn default() -> Self {
245 11 : Self::Active
246 11 : }
247 : }
248 :
249 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
250 : pub enum NodeSchedulingPolicy {
251 : Active,
252 : Filling,
253 : Pause,
254 : PauseForRestart,
255 : Draining,
256 : }
257 :
258 : impl FromStr for NodeSchedulingPolicy {
259 : type Err = anyhow::Error;
260 :
261 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
262 0 : match s {
263 0 : "active" => Ok(Self::Active),
264 0 : "filling" => Ok(Self::Filling),
265 0 : "pause" => Ok(Self::Pause),
266 0 : "pause_for_restart" => Ok(Self::PauseForRestart),
267 0 : "draining" => Ok(Self::Draining),
268 0 : _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
269 : }
270 0 : }
271 : }
272 :
273 : impl From<NodeSchedulingPolicy> for String {
274 0 : fn from(value: NodeSchedulingPolicy) -> String {
275 : use NodeSchedulingPolicy::*;
276 0 : match value {
277 0 : Active => "active",
278 0 : Filling => "filling",
279 0 : Pause => "pause",
280 0 : PauseForRestart => "pause_for_restart",
281 0 : Draining => "draining",
282 : }
283 0 : .to_string()
284 0 : }
285 : }
286 :
287 : /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
288 : /// to create secondary locations.
289 4 : #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
290 : pub enum PlacementPolicy {
291 : /// Normal live state: one attached pageserver and zero or more secondaries.
292 : Attached(usize),
293 : /// Create one secondary mode locations. This is useful when onboarding
294 : /// a tenant, or for an idle tenant that we might want to bring online quickly.
295 : Secondary,
296 :
297 : /// Do not attach to any pageservers. This is appropriate for tenants that
298 : /// have been idle for a long time, where we do not mind some delay in making
299 : /// them available in future.
300 : Detached,
301 : }
302 :
303 0 : #[derive(Serialize, Deserialize, Debug)]
304 : pub struct TenantShardMigrateResponse {}
305 :
306 : /// Metadata health record posted from scrubber.
307 0 : #[derive(Serialize, Deserialize, Debug)]
308 : pub struct MetadataHealthRecord {
309 : pub tenant_shard_id: TenantShardId,
310 : pub healthy: bool,
311 : pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
312 : }
313 :
314 0 : #[derive(Serialize, Deserialize, Debug)]
315 : pub struct MetadataHealthUpdateRequest {
316 : pub healthy_tenant_shards: HashSet<TenantShardId>,
317 : pub unhealthy_tenant_shards: HashSet<TenantShardId>,
318 : }
319 :
320 0 : #[derive(Serialize, Deserialize, Debug)]
321 : pub struct MetadataHealthUpdateResponse {}
322 :
323 0 : #[derive(Serialize, Deserialize, Debug)]
324 : pub struct MetadataHealthListUnhealthyResponse {
325 : pub unhealthy_tenant_shards: Vec<TenantShardId>,
326 : }
327 :
328 0 : #[derive(Serialize, Deserialize, Debug)]
329 : pub struct MetadataHealthListOutdatedRequest {
330 : #[serde(with = "humantime_serde")]
331 : pub not_scrubbed_for: Duration,
332 : }
333 :
334 0 : #[derive(Serialize, Deserialize, Debug)]
335 : pub struct MetadataHealthListOutdatedResponse {
336 : pub health_records: Vec<MetadataHealthRecord>,
337 : }
338 :
339 : #[cfg(test)]
340 : mod test {
341 : use super::*;
342 : use serde_json;
343 :
344 : /// Check stability of PlacementPolicy's serialization
345 : #[test]
346 1 : fn placement_policy_encoding() -> anyhow::Result<()> {
347 1 : let v = PlacementPolicy::Attached(1);
348 1 : let encoded = serde_json::to_string(&v)?;
349 1 : assert_eq!(encoded, "{\"Attached\":1}");
350 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
351 :
352 1 : let v = PlacementPolicy::Detached;
353 1 : let encoded = serde_json::to_string(&v)?;
354 1 : assert_eq!(encoded, "\"Detached\"");
355 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
356 1 : Ok(())
357 1 : }
358 :
359 : #[test]
360 1 : fn test_reject_unknown_field() {
361 1 : let id = TenantId::generate();
362 1 : let create_request = serde_json::json!({
363 1 : "new_tenant_id": id.to_string(),
364 1 : "unknown_field": "unknown_value".to_string(),
365 1 : });
366 1 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
367 1 : assert!(
368 1 : err.to_string().contains("unknown field `unknown_field`"),
369 0 : "expect unknown field `unknown_field` error, got: {}",
370 : err
371 : );
372 1 : }
373 : }
|