Line data Source code
1 : use std::collections::HashSet;
2 : use std::str::FromStr;
3 : use std::time::{Duration, Instant};
4 :
5 : /// Request/response types for the storage controller
6 : /// API (`/control/v1` prefix). Implemented by the server
7 : /// in [`storage_controller::http`]
8 : use serde::{Deserialize, Serialize};
9 : use utils::id::{NodeId, TenantId};
10 :
11 : use crate::models::PageserverUtilization;
12 : use crate::{
13 : models::{ShardParameters, TenantConfig},
14 : shard::{ShardStripeSize, TenantShardId},
15 : };
16 :
17 3 : #[derive(Serialize, Deserialize, Debug)]
18 : #[serde(deny_unknown_fields)]
19 : pub struct TenantCreateRequest {
20 : pub new_tenant_id: TenantShardId,
21 : #[serde(default)]
22 : #[serde(skip_serializing_if = "Option::is_none")]
23 : pub generation: Option<u32>,
24 :
25 : // If omitted, create a single shard with TenantShardId::unsharded()
26 : #[serde(default)]
27 : #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
28 : pub shard_parameters: ShardParameters,
29 :
30 : #[serde(default)]
31 : #[serde(skip_serializing_if = "Option::is_none")]
32 : pub placement_policy: Option<PlacementPolicy>,
33 :
34 : #[serde(flatten)]
35 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
36 : }
37 :
38 0 : #[derive(Serialize, Deserialize)]
39 : pub struct TenantCreateResponseShard {
40 : pub shard_id: TenantShardId,
41 : pub node_id: NodeId,
42 : pub generation: u32,
43 : }
44 :
45 0 : #[derive(Serialize, Deserialize)]
46 : pub struct TenantCreateResponse {
47 : pub shards: Vec<TenantCreateResponseShard>,
48 : }
49 :
50 0 : #[derive(Serialize, Deserialize)]
51 : pub struct NodeRegisterRequest {
52 : pub node_id: NodeId,
53 :
54 : pub listen_pg_addr: String,
55 : pub listen_pg_port: u16,
56 :
57 : pub listen_http_addr: String,
58 : pub listen_http_port: u16,
59 :
60 : pub availability_zone_id: Option<String>,
61 : }
62 :
63 0 : #[derive(Serialize, Deserialize)]
64 : pub struct NodeConfigureRequest {
65 : pub node_id: NodeId,
66 :
67 : pub availability: Option<NodeAvailabilityWrapper>,
68 : pub scheduling: Option<NodeSchedulingPolicy>,
69 : }
70 :
71 0 : #[derive(Serialize, Deserialize)]
72 : pub struct TenantPolicyRequest {
73 : pub placement: Option<PlacementPolicy>,
74 : pub scheduling: Option<ShardSchedulingPolicy>,
75 : }
76 :
77 0 : #[derive(Serialize, Deserialize, Debug)]
78 : pub struct TenantLocateResponseShard {
79 : pub shard_id: TenantShardId,
80 : pub node_id: NodeId,
81 :
82 : pub listen_pg_addr: String,
83 : pub listen_pg_port: u16,
84 :
85 : pub listen_http_addr: String,
86 : pub listen_http_port: u16,
87 : }
88 :
89 0 : #[derive(Serialize, Deserialize)]
90 : pub struct TenantLocateResponse {
91 : pub shards: Vec<TenantLocateResponseShard>,
92 : pub shard_params: ShardParameters,
93 : }
94 :
95 0 : #[derive(Serialize, Deserialize, Debug)]
96 : pub struct TenantDescribeResponse {
97 : pub tenant_id: TenantId,
98 : pub shards: Vec<TenantDescribeResponseShard>,
99 : pub stripe_size: ShardStripeSize,
100 : pub policy: PlacementPolicy,
101 : pub config: TenantConfig,
102 : }
103 :
104 0 : #[derive(Serialize, Deserialize)]
105 : pub struct NodeDescribeResponse {
106 : pub id: NodeId,
107 :
108 : pub availability: NodeAvailabilityWrapper,
109 : pub scheduling: NodeSchedulingPolicy,
110 :
111 : pub listen_http_addr: String,
112 : pub listen_http_port: u16,
113 :
114 : pub listen_pg_addr: String,
115 : pub listen_pg_port: u16,
116 : }
117 :
118 0 : #[derive(Serialize, Deserialize, Debug)]
119 : pub struct TenantDescribeResponseShard {
120 : pub tenant_shard_id: TenantShardId,
121 :
122 : pub node_attached: Option<NodeId>,
123 : pub node_secondary: Vec<NodeId>,
124 :
125 : pub last_error: String,
126 :
127 : /// A task is currently running to reconcile this tenant's intent state with the state on pageservers
128 : pub is_reconciling: bool,
129 : /// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
130 : pub is_pending_compute_notification: bool,
131 : /// A shard split is currently underway
132 : pub is_splitting: bool,
133 :
134 : pub scheduling_policy: ShardSchedulingPolicy,
135 : }
136 :
137 : /// Explicitly migrating a particular shard is a low level operation
138 : /// TODO: higher level "Reschedule tenant" operation where the request
139 : /// specifies some constraints, e.g. asking it to get off particular node(s)
140 0 : #[derive(Serialize, Deserialize, Debug)]
141 : pub struct TenantShardMigrateRequest {
142 : pub tenant_shard_id: TenantShardId,
143 : pub node_id: NodeId,
144 : }
145 :
146 : #[derive(Serialize, Clone, Debug)]
147 : #[serde(into = "NodeAvailabilityWrapper")]
148 : pub enum NodeAvailability {
149 : // Normal, happy state
150 : Active(PageserverUtilization),
151 : // Node is warming up, but we expect it to become available soon. Covers
152 : // the time span between the re-attach response being composed on the storage controller
153 : // and the first successful heartbeat after the processing of the re-attach response
154 : // finishes on the pageserver.
155 : WarmingUp(Instant),
156 : // Offline: Tenants shouldn't try to attach here, but they may assume that their
157 : // secondary locations on this node still exist. Newly added nodes are in this
158 : // state until we successfully contact them.
159 : Offline,
160 : }
161 :
162 : impl PartialEq for NodeAvailability {
163 0 : fn eq(&self, other: &Self) -> bool {
164 : use NodeAvailability::*;
165 0 : matches!(
166 0 : (self, other),
167 : (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
168 : )
169 0 : }
170 : }
171 :
172 : impl Eq for NodeAvailability {}
173 :
174 : // This wrapper provides serde functionality and it should only be used to
175 : // communicate with external callers which don't know or care about the
176 : // utilisation score of the pageserver it is targeting.
177 0 : #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
178 : pub enum NodeAvailabilityWrapper {
179 : Active,
180 : WarmingUp,
181 : Offline,
182 : }
183 :
184 : impl From<NodeAvailabilityWrapper> for NodeAvailability {
185 0 : fn from(val: NodeAvailabilityWrapper) -> Self {
186 0 : match val {
187 : // Assume the worst utilisation score to begin with. It will later be updated by
188 : // the heartbeats.
189 : NodeAvailabilityWrapper::Active => {
190 0 : NodeAvailability::Active(PageserverUtilization::full())
191 : }
192 0 : NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
193 0 : NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
194 : }
195 0 : }
196 : }
197 :
198 : impl From<NodeAvailability> for NodeAvailabilityWrapper {
199 0 : fn from(val: NodeAvailability) -> Self {
200 0 : match val {
201 0 : NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
202 0 : NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
203 0 : NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
204 : }
205 0 : }
206 : }
207 :
208 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
209 : pub enum ShardSchedulingPolicy {
210 : // Normal mode: the tenant's scheduled locations may be updated at will, including
211 : // for non-essential optimization.
212 : Active,
213 :
214 : // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
215 : // For example, this still permits a node's attachment location to change to a secondary in
216 : // response to a node failure, or to assign a new secondary if a node was removed.
217 : Essential,
218 :
219 : // No scheduling: leave the shard running wherever it currently is. Even if the shard is
220 : // unavailable, it will not be rescheduled to another node.
221 : Pause,
222 :
223 : // No reconciling: we will make no location_conf API calls to pageservers at all. If the
224 : // shard is unavailable, it stays that way. If a node fails, this shard doesn't get failed over.
225 : Stop,
226 : }
227 :
228 : impl Default for ShardSchedulingPolicy {
229 11 : fn default() -> Self {
230 11 : Self::Active
231 11 : }
232 : }
233 :
234 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
235 : pub enum NodeSchedulingPolicy {
236 : Active,
237 : Filling,
238 : Pause,
239 : PauseForRestart,
240 : Draining,
241 : }
242 :
243 : impl FromStr for NodeSchedulingPolicy {
244 : type Err = anyhow::Error;
245 :
246 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
247 0 : match s {
248 0 : "active" => Ok(Self::Active),
249 0 : "filling" => Ok(Self::Filling),
250 0 : "pause" => Ok(Self::Pause),
251 0 : "pause_for_restart" => Ok(Self::PauseForRestart),
252 0 : "draining" => Ok(Self::Draining),
253 0 : _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
254 : }
255 0 : }
256 : }
257 :
258 : impl From<NodeSchedulingPolicy> for String {
259 0 : fn from(value: NodeSchedulingPolicy) -> String {
260 0 : use NodeSchedulingPolicy::*;
261 0 : match value {
262 0 : Active => "active",
263 0 : Filling => "filling",
264 0 : Pause => "pause",
265 0 : PauseForRestart => "pause_for_restart",
266 0 : Draining => "draining",
267 : }
268 0 : .to_string()
269 0 : }
270 : }
271 :
272 : /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
273 : /// to create secondary locations.
274 4 : #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
275 : pub enum PlacementPolicy {
276 : /// Normal live state: one attached pageserver and zero or more secondaries.
277 : Attached(usize),
278 : /// Create one secondary mode locations. This is useful when onboarding
279 : /// a tenant, or for an idle tenant that we might want to bring online quickly.
280 : Secondary,
281 :
282 : /// Do not attach to any pageservers. This is appropriate for tenants that
283 : /// have been idle for a long time, where we do not mind some delay in making
284 : /// them available in future.
285 : Detached,
286 : }
287 :
288 0 : #[derive(Serialize, Deserialize, Debug)]
289 : pub struct TenantShardMigrateResponse {}
290 :
291 : /// Metadata health record posted from scrubber.
292 0 : #[derive(Serialize, Deserialize, Debug)]
293 : pub struct MetadataHealthRecord {
294 : pub tenant_shard_id: TenantShardId,
295 : pub healthy: bool,
296 : pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
297 : }
298 :
299 0 : #[derive(Serialize, Deserialize, Debug)]
300 : pub struct MetadataHealthUpdateRequest {
301 : pub healthy_tenant_shards: HashSet<TenantShardId>,
302 : pub unhealthy_tenant_shards: HashSet<TenantShardId>,
303 : }
304 :
305 0 : #[derive(Serialize, Deserialize, Debug)]
306 : pub struct MetadataHealthUpdateResponse {}
307 :
308 0 : #[derive(Serialize, Deserialize, Debug)]
309 : pub struct MetadataHealthListUnhealthyResponse {
310 : pub unhealthy_tenant_shards: Vec<TenantShardId>,
311 : }
312 :
313 0 : #[derive(Serialize, Deserialize, Debug)]
314 : pub struct MetadataHealthListOutdatedRequest {
315 : #[serde(with = "humantime_serde")]
316 : pub not_scrubbed_for: Duration,
317 : }
318 :
319 0 : #[derive(Serialize, Deserialize, Debug)]
320 : pub struct MetadataHealthListOutdatedResponse {
321 : pub health_records: Vec<MetadataHealthRecord>,
322 : }
323 :
324 : #[cfg(test)]
325 : mod test {
326 : use super::*;
327 : use serde_json;
328 :
329 : /// Check stability of PlacementPolicy's serialization
330 : #[test]
331 1 : fn placement_policy_encoding() -> anyhow::Result<()> {
332 1 : let v = PlacementPolicy::Attached(1);
333 1 : let encoded = serde_json::to_string(&v)?;
334 1 : assert_eq!(encoded, "{\"Attached\":1}");
335 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
336 :
337 1 : let v = PlacementPolicy::Detached;
338 1 : let encoded = serde_json::to_string(&v)?;
339 1 : assert_eq!(encoded, "\"Detached\"");
340 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
341 1 : Ok(())
342 1 : }
343 :
344 : #[test]
345 1 : fn test_reject_unknown_field() {
346 1 : let id = TenantId::generate();
347 1 : let create_request = serde_json::json!({
348 1 : "new_tenant_id": id.to_string(),
349 1 : "unknown_field": "unknown_value".to_string(),
350 1 : });
351 1 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
352 1 : assert!(
353 1 : err.to_string().contains("unknown field `unknown_field`"),
354 0 : "expect unknown field `unknown_field` error, got: {}",
355 : err
356 : );
357 1 : }
358 : }
|