Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::str::FromStr;
3 : use std::time::{Duration, Instant};
4 :
5 : /// Request/response types for the storage controller
6 : /// API (`/control/v1` prefix). Implemented by the server
7 : /// in [`storage_controller::http`]
8 : use serde::{Deserialize, Serialize};
9 : use utils::id::{NodeId, TenantId};
10 :
11 : use crate::models::PageserverUtilization;
12 : use crate::{
13 : models::{ShardParameters, TenantConfig},
14 : shard::{ShardStripeSize, TenantShardId},
15 : };
16 :
17 3 : #[derive(Serialize, Deserialize, Debug)]
18 : #[serde(deny_unknown_fields)]
19 : pub struct TenantCreateRequest {
20 : pub new_tenant_id: TenantShardId,
21 : #[serde(default)]
22 : #[serde(skip_serializing_if = "Option::is_none")]
23 : pub generation: Option<u32>,
24 :
25 : // If omitted, create a single shard with TenantShardId::unsharded()
26 : #[serde(default)]
27 : #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
28 : pub shard_parameters: ShardParameters,
29 :
30 : #[serde(default)]
31 : #[serde(skip_serializing_if = "Option::is_none")]
32 : pub placement_policy: Option<PlacementPolicy>,
33 :
34 : #[serde(flatten)]
35 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
36 : }
37 :
38 0 : #[derive(Serialize, Deserialize)]
39 : pub struct TenantCreateResponseShard {
40 : pub shard_id: TenantShardId,
41 : pub node_id: NodeId,
42 : pub generation: u32,
43 : }
44 :
45 0 : #[derive(Serialize, Deserialize)]
46 : pub struct TenantCreateResponse {
47 : pub shards: Vec<TenantCreateResponseShard>,
48 : }
49 :
50 0 : #[derive(Serialize, Deserialize)]
51 : pub struct NodeRegisterRequest {
52 : pub node_id: NodeId,
53 :
54 : pub listen_pg_addr: String,
55 : pub listen_pg_port: u16,
56 :
57 : pub listen_http_addr: String,
58 : pub listen_http_port: u16,
59 :
60 : pub availability_zone_id: String,
61 : }
62 :
63 0 : #[derive(Serialize, Deserialize)]
64 : pub struct NodeConfigureRequest {
65 : pub node_id: NodeId,
66 :
67 : pub availability: Option<NodeAvailabilityWrapper>,
68 : pub scheduling: Option<NodeSchedulingPolicy>,
69 : }
70 :
71 0 : #[derive(Serialize, Deserialize)]
72 : pub struct TenantPolicyRequest {
73 : pub placement: Option<PlacementPolicy>,
74 : pub scheduling: Option<ShardSchedulingPolicy>,
75 : }
76 :
77 0 : #[derive(Serialize, Deserialize)]
78 : pub struct ShardsPreferredAzsRequest {
79 : #[serde(flatten)]
80 : pub preferred_az_ids: HashMap<TenantShardId, String>,
81 : }
82 :
83 0 : #[derive(Serialize, Deserialize)]
84 : pub struct ShardsPreferredAzsResponse {
85 : pub updated: Vec<TenantShardId>,
86 : }
87 :
88 0 : #[derive(Serialize, Deserialize, Debug)]
89 : pub struct TenantLocateResponseShard {
90 : pub shard_id: TenantShardId,
91 : pub node_id: NodeId,
92 :
93 : pub listen_pg_addr: String,
94 : pub listen_pg_port: u16,
95 :
96 : pub listen_http_addr: String,
97 : pub listen_http_port: u16,
98 : }
99 :
100 0 : #[derive(Serialize, Deserialize)]
101 : pub struct TenantLocateResponse {
102 : pub shards: Vec<TenantLocateResponseShard>,
103 : pub shard_params: ShardParameters,
104 : }
105 :
106 0 : #[derive(Serialize, Deserialize, Debug)]
107 : pub struct TenantDescribeResponse {
108 : pub tenant_id: TenantId,
109 : pub shards: Vec<TenantDescribeResponseShard>,
110 : pub stripe_size: ShardStripeSize,
111 : pub policy: PlacementPolicy,
112 : pub config: TenantConfig,
113 : }
114 :
115 0 : #[derive(Serialize, Deserialize, Debug)]
116 : pub struct NodeShardResponse {
117 : pub node_id: NodeId,
118 : pub shards: Vec<NodeShard>,
119 : }
120 :
121 0 : #[derive(Serialize, Deserialize, Debug)]
122 : pub struct NodeShard {
123 : pub tenant_shard_id: TenantShardId,
124 : /// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
125 : pub is_observed_secondary: Option<bool>,
126 : /// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
127 : pub is_intended_secondary: Option<bool>,
128 : }
129 :
130 0 : #[derive(Serialize, Deserialize)]
131 : pub struct NodeDescribeResponse {
132 : pub id: NodeId,
133 :
134 : pub availability: NodeAvailabilityWrapper,
135 : pub scheduling: NodeSchedulingPolicy,
136 :
137 : pub listen_http_addr: String,
138 : pub listen_http_port: u16,
139 :
140 : pub listen_pg_addr: String,
141 : pub listen_pg_port: u16,
142 : }
143 :
144 0 : #[derive(Serialize, Deserialize, Debug)]
145 : pub struct TenantDescribeResponseShard {
146 : pub tenant_shard_id: TenantShardId,
147 :
148 : pub node_attached: Option<NodeId>,
149 : pub node_secondary: Vec<NodeId>,
150 :
151 : pub last_error: String,
152 :
153 : /// A task is currently running to reconcile this tenant's intent state with the state on pageservers
154 : pub is_reconciling: bool,
155 : /// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
156 : pub is_pending_compute_notification: bool,
157 : /// A shard split is currently underway
158 : pub is_splitting: bool,
159 :
160 : pub scheduling_policy: ShardSchedulingPolicy,
161 :
162 : pub preferred_az_id: Option<String>,
163 : }
164 :
165 : /// Migration request for a given tenant shard to a given node.
166 : ///
167 : /// Explicitly migrating a particular shard is a low level operation
168 : /// TODO: higher level "Reschedule tenant" operation where the request
169 : /// specifies some constraints, e.g. asking it to get off particular node(s)
170 0 : #[derive(Serialize, Deserialize, Debug)]
171 : pub struct TenantShardMigrateRequest {
172 : pub tenant_shard_id: TenantShardId,
173 : pub node_id: NodeId,
174 : }
175 :
176 : #[derive(Serialize, Clone, Debug)]
177 : #[serde(into = "NodeAvailabilityWrapper")]
178 : pub enum NodeAvailability {
179 : // Normal, happy state
180 : Active(PageserverUtilization),
181 : // Node is warming up, but we expect it to become available soon. Covers
182 : // the time span between the re-attach response being composed on the storage controller
183 : // and the first successful heartbeat after the processing of the re-attach response
184 : // finishes on the pageserver.
185 : WarmingUp(Instant),
186 : // Offline: Tenants shouldn't try to attach here, but they may assume that their
187 : // secondary locations on this node still exist. Newly added nodes are in this
188 : // state until we successfully contact them.
189 : Offline,
190 : }
191 :
192 : impl PartialEq for NodeAvailability {
193 0 : fn eq(&self, other: &Self) -> bool {
194 : use NodeAvailability::*;
195 0 : matches!(
196 0 : (self, other),
197 : (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
198 : )
199 0 : }
200 : }
201 :
202 : impl Eq for NodeAvailability {}
203 :
204 : // This wrapper provides serde functionality and it should only be used to
205 : // communicate with external callers which don't know or care about the
206 : // utilisation score of the pageserver it is targeting.
207 0 : #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
208 : pub enum NodeAvailabilityWrapper {
209 : Active,
210 : WarmingUp,
211 : Offline,
212 : }
213 :
214 : impl From<NodeAvailabilityWrapper> for NodeAvailability {
215 0 : fn from(val: NodeAvailabilityWrapper) -> Self {
216 0 : match val {
217 : // Assume the worst utilisation score to begin with. It will later be updated by
218 : // the heartbeats.
219 : NodeAvailabilityWrapper::Active => {
220 0 : NodeAvailability::Active(PageserverUtilization::full())
221 : }
222 0 : NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
223 0 : NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
224 : }
225 0 : }
226 : }
227 :
228 : impl From<NodeAvailability> for NodeAvailabilityWrapper {
229 0 : fn from(val: NodeAvailability) -> Self {
230 0 : match val {
231 0 : NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
232 0 : NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
233 0 : NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
234 : }
235 0 : }
236 : }
237 :
238 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
239 : pub enum ShardSchedulingPolicy {
240 : // Normal mode: the tenant's scheduled locations may be updated at will, including
241 : // for non-essential optimization.
242 : Active,
243 :
244 : // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
245 : // For example, this still permits a node's attachment location to change to a secondary in
246 : // response to a node failure, or to assign a new secondary if a node was removed.
247 : Essential,
248 :
249 : // No scheduling: leave the shard running wherever it currently is. Even if the shard is
250 : // unavailable, it will not be rescheduled to another node.
251 : Pause,
252 :
253 : // No reconciling: we will make no location_conf API calls to pageservers at all. If the
254 : // shard is unavailable, it stays that way. If a node fails, this shard doesn't get failed over.
255 : Stop,
256 : }
257 :
258 : impl Default for ShardSchedulingPolicy {
259 19 : fn default() -> Self {
260 19 : Self::Active
261 19 : }
262 : }
263 :
264 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
265 : pub enum NodeSchedulingPolicy {
266 : Active,
267 : Filling,
268 : Pause,
269 : PauseForRestart,
270 : Draining,
271 : }
272 :
273 : impl FromStr for NodeSchedulingPolicy {
274 : type Err = anyhow::Error;
275 :
276 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
277 0 : match s {
278 0 : "active" => Ok(Self::Active),
279 0 : "filling" => Ok(Self::Filling),
280 0 : "pause" => Ok(Self::Pause),
281 0 : "pause_for_restart" => Ok(Self::PauseForRestart),
282 0 : "draining" => Ok(Self::Draining),
283 0 : _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
284 : }
285 0 : }
286 : }
287 :
288 : impl From<NodeSchedulingPolicy> for String {
289 0 : fn from(value: NodeSchedulingPolicy) -> String {
290 : use NodeSchedulingPolicy::*;
291 0 : match value {
292 0 : Active => "active",
293 0 : Filling => "filling",
294 0 : Pause => "pause",
295 0 : PauseForRestart => "pause_for_restart",
296 0 : Draining => "draining",
297 : }
298 0 : .to_string()
299 0 : }
300 : }
301 :
302 : /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
303 : /// to create secondary locations.
304 4 : #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
305 : pub enum PlacementPolicy {
306 : /// Normal live state: one attached pageserver and zero or more secondaries.
307 : Attached(usize),
308 : /// Create one secondary mode locations. This is useful when onboarding
309 : /// a tenant, or for an idle tenant that we might want to bring online quickly.
310 : Secondary,
311 :
312 : /// Do not attach to any pageservers. This is appropriate for tenants that
313 : /// have been idle for a long time, where we do not mind some delay in making
314 : /// them available in future.
315 : Detached,
316 : }
317 :
318 0 : #[derive(Serialize, Deserialize, Debug)]
319 : pub struct TenantShardMigrateResponse {}
320 :
321 : /// Metadata health record posted from scrubber.
322 0 : #[derive(Serialize, Deserialize, Debug)]
323 : pub struct MetadataHealthRecord {
324 : pub tenant_shard_id: TenantShardId,
325 : pub healthy: bool,
326 : pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
327 : }
328 :
329 0 : #[derive(Serialize, Deserialize, Debug)]
330 : pub struct MetadataHealthUpdateRequest {
331 : pub healthy_tenant_shards: HashSet<TenantShardId>,
332 : pub unhealthy_tenant_shards: HashSet<TenantShardId>,
333 : }
334 :
335 0 : #[derive(Serialize, Deserialize, Debug)]
336 : pub struct MetadataHealthUpdateResponse {}
337 :
338 0 : #[derive(Serialize, Deserialize, Debug)]
339 : pub struct MetadataHealthListUnhealthyResponse {
340 : pub unhealthy_tenant_shards: Vec<TenantShardId>,
341 : }
342 :
343 0 : #[derive(Serialize, Deserialize, Debug)]
344 : pub struct MetadataHealthListOutdatedRequest {
345 : #[serde(with = "humantime_serde")]
346 : pub not_scrubbed_for: Duration,
347 : }
348 :
349 0 : #[derive(Serialize, Deserialize, Debug)]
350 : pub struct MetadataHealthListOutdatedResponse {
351 : pub health_records: Vec<MetadataHealthRecord>,
352 : }
353 :
354 : #[cfg(test)]
355 : mod test {
356 : use super::*;
357 : use serde_json;
358 :
359 : /// Check stability of PlacementPolicy's serialization
360 : #[test]
361 1 : fn placement_policy_encoding() -> anyhow::Result<()> {
362 1 : let v = PlacementPolicy::Attached(1);
363 1 : let encoded = serde_json::to_string(&v)?;
364 1 : assert_eq!(encoded, "{\"Attached\":1}");
365 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
366 :
367 1 : let v = PlacementPolicy::Detached;
368 1 : let encoded = serde_json::to_string(&v)?;
369 1 : assert_eq!(encoded, "\"Detached\"");
370 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
371 1 : Ok(())
372 1 : }
373 :
374 : #[test]
375 1 : fn test_reject_unknown_field() {
376 1 : let id = TenantId::generate();
377 1 : let create_request = serde_json::json!({
378 1 : "new_tenant_id": id.to_string(),
379 1 : "unknown_field": "unknown_value".to_string(),
380 1 : });
381 1 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
382 1 : assert!(
383 1 : err.to_string().contains("unknown field `unknown_field`"),
384 0 : "expect unknown field `unknown_field` error, got: {}",
385 : err
386 : );
387 1 : }
388 : }
|