Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::fmt::Display;
3 : use std::str::FromStr;
4 : use std::time::{Duration, Instant};
5 :
6 : /// Request/response types for the storage controller
7 : /// API (`/control/v1` prefix). Implemented by the server
8 : /// in [`storage_controller::http`]
9 : use serde::{Deserialize, Serialize};
10 : use utils::id::{NodeId, TenantId};
11 :
12 : use crate::models::PageserverUtilization;
13 : use crate::{
14 : models::{ShardParameters, TenantConfig},
15 : shard::{ShardStripeSize, TenantShardId},
16 : };
17 :
18 2 : #[derive(Serialize, Deserialize, Debug)]
19 : #[serde(deny_unknown_fields)]
20 : pub struct TenantCreateRequest {
21 : pub new_tenant_id: TenantShardId,
22 : #[serde(default)]
23 : #[serde(skip_serializing_if = "Option::is_none")]
24 : pub generation: Option<u32>,
25 :
26 : // If omitted, create a single shard with TenantShardId::unsharded()
27 : #[serde(default)]
28 : #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
29 : pub shard_parameters: ShardParameters,
30 :
31 : #[serde(default)]
32 : #[serde(skip_serializing_if = "Option::is_none")]
33 : pub placement_policy: Option<PlacementPolicy>,
34 :
35 : #[serde(flatten)]
36 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
37 : }
38 :
39 0 : #[derive(Serialize, Deserialize)]
40 : pub struct TenantCreateResponseShard {
41 : pub shard_id: TenantShardId,
42 : pub node_id: NodeId,
43 : pub generation: u32,
44 : }
45 :
46 0 : #[derive(Serialize, Deserialize)]
47 : pub struct TenantCreateResponse {
48 : pub shards: Vec<TenantCreateResponseShard>,
49 : }
50 :
51 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
52 : pub struct NodeRegisterRequest {
53 : pub node_id: NodeId,
54 :
55 : pub listen_pg_addr: String,
56 : pub listen_pg_port: u16,
57 :
58 : pub listen_http_addr: String,
59 : pub listen_http_port: u16,
60 :
61 : pub availability_zone_id: AvailabilityZone,
62 : }
63 :
64 0 : #[derive(Serialize, Deserialize)]
65 : pub struct NodeConfigureRequest {
66 : pub node_id: NodeId,
67 :
68 : pub availability: Option<NodeAvailabilityWrapper>,
69 : pub scheduling: Option<NodeSchedulingPolicy>,
70 : }
71 :
72 0 : #[derive(Serialize, Deserialize)]
73 : pub struct TenantPolicyRequest {
74 : pub placement: Option<PlacementPolicy>,
75 : pub scheduling: Option<ShardSchedulingPolicy>,
76 : }
77 :
78 0 : #[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, PartialOrd, Ord)]
79 : pub struct AvailabilityZone(pub String);
80 :
81 : impl Display for AvailabilityZone {
82 300 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 300 : write!(f, "{}", self.0)
84 300 : }
85 : }
86 :
87 0 : #[derive(Serialize, Deserialize)]
88 : pub struct ShardsPreferredAzsRequest {
89 : #[serde(flatten)]
90 : pub preferred_az_ids: HashMap<TenantShardId, Option<AvailabilityZone>>,
91 : }
92 :
93 0 : #[derive(Serialize, Deserialize)]
94 : pub struct ShardsPreferredAzsResponse {
95 : pub updated: Vec<TenantShardId>,
96 : }
97 :
98 0 : #[derive(Serialize, Deserialize, Debug)]
99 : pub struct TenantLocateResponseShard {
100 : pub shard_id: TenantShardId,
101 : pub node_id: NodeId,
102 :
103 : pub listen_pg_addr: String,
104 : pub listen_pg_port: u16,
105 :
106 : pub listen_http_addr: String,
107 : pub listen_http_port: u16,
108 : }
109 :
110 0 : #[derive(Serialize, Deserialize)]
111 : pub struct TenantLocateResponse {
112 : pub shards: Vec<TenantLocateResponseShard>,
113 : pub shard_params: ShardParameters,
114 : }
115 :
116 0 : #[derive(Serialize, Deserialize, Debug)]
117 : pub struct TenantDescribeResponse {
118 : pub tenant_id: TenantId,
119 : pub shards: Vec<TenantDescribeResponseShard>,
120 : pub stripe_size: ShardStripeSize,
121 : pub policy: PlacementPolicy,
122 : pub config: TenantConfig,
123 : }
124 :
125 0 : #[derive(Serialize, Deserialize, Debug)]
126 : pub struct NodeShardResponse {
127 : pub node_id: NodeId,
128 : pub shards: Vec<NodeShard>,
129 : }
130 :
131 0 : #[derive(Serialize, Deserialize, Debug)]
132 : pub struct NodeShard {
133 : pub tenant_shard_id: TenantShardId,
134 : /// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
135 : pub is_observed_secondary: Option<bool>,
136 : /// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
137 : pub is_intended_secondary: Option<bool>,
138 : }
139 :
140 0 : #[derive(Serialize, Deserialize)]
141 : pub struct NodeDescribeResponse {
142 : pub id: NodeId,
143 :
144 : pub availability: NodeAvailabilityWrapper,
145 : pub scheduling: NodeSchedulingPolicy,
146 :
147 : pub availability_zone_id: String,
148 :
149 : pub listen_http_addr: String,
150 : pub listen_http_port: u16,
151 :
152 : pub listen_pg_addr: String,
153 : pub listen_pg_port: u16,
154 : }
155 :
156 0 : #[derive(Serialize, Deserialize, Debug)]
157 : pub struct TenantDescribeResponseShard {
158 : pub tenant_shard_id: TenantShardId,
159 :
160 : pub node_attached: Option<NodeId>,
161 : pub node_secondary: Vec<NodeId>,
162 :
163 : pub last_error: String,
164 :
165 : /// A task is currently running to reconcile this tenant's intent state with the state on pageservers
166 : pub is_reconciling: bool,
167 : /// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
168 : pub is_pending_compute_notification: bool,
169 : /// A shard split is currently underway
170 : pub is_splitting: bool,
171 :
172 : pub scheduling_policy: ShardSchedulingPolicy,
173 :
174 : pub preferred_az_id: Option<String>,
175 : }
176 :
177 : /// Migration request for a given tenant shard to a given node.
178 : ///
179 : /// Explicitly migrating a particular shard is a low level operation
180 : /// TODO: higher level "Reschedule tenant" operation where the request
181 : /// specifies some constraints, e.g. asking it to get off particular node(s)
182 0 : #[derive(Serialize, Deserialize, Debug)]
183 : pub struct TenantShardMigrateRequest {
184 : pub node_id: NodeId,
185 : #[serde(default)]
186 : pub migration_config: Option<MigrationConfig>,
187 : }
188 :
189 0 : #[derive(Serialize, Deserialize, Debug)]
190 : pub struct MigrationConfig {
191 : #[serde(default)]
192 : #[serde(with = "humantime_serde")]
193 : pub secondary_warmup_timeout: Option<Duration>,
194 : #[serde(default)]
195 : #[serde(with = "humantime_serde")]
196 : pub secondary_download_request_timeout: Option<Duration>,
197 : }
198 :
199 : #[derive(Serialize, Clone, Debug)]
200 : #[serde(into = "NodeAvailabilityWrapper")]
201 : pub enum NodeAvailability {
202 : // Normal, happy state
203 : Active(PageserverUtilization),
204 : // Node is warming up, but we expect it to become available soon. Covers
205 : // the time span between the re-attach response being composed on the storage controller
206 : // and the first successful heartbeat after the processing of the re-attach response
207 : // finishes on the pageserver.
208 : WarmingUp(Instant),
209 : // Offline: Tenants shouldn't try to attach here, but they may assume that their
210 : // secondary locations on this node still exist. Newly added nodes are in this
211 : // state until we successfully contact them.
212 : Offline,
213 : }
214 :
215 : impl PartialEq for NodeAvailability {
216 0 : fn eq(&self, other: &Self) -> bool {
217 : use NodeAvailability::*;
218 0 : matches!(
219 0 : (self, other),
220 : (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
221 : )
222 0 : }
223 : }
224 :
225 : impl Eq for NodeAvailability {}
226 :
227 : // This wrapper provides serde functionality and it should only be used to
228 : // communicate with external callers which don't know or care about the
229 : // utilisation score of the pageserver it is targeting.
230 0 : #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
231 : pub enum NodeAvailabilityWrapper {
232 : Active,
233 : WarmingUp,
234 : Offline,
235 : }
236 :
237 : impl From<NodeAvailabilityWrapper> for NodeAvailability {
238 0 : fn from(val: NodeAvailabilityWrapper) -> Self {
239 0 : match val {
240 : // Assume the worst utilisation score to begin with. It will later be updated by
241 : // the heartbeats.
242 : NodeAvailabilityWrapper::Active => {
243 0 : NodeAvailability::Active(PageserverUtilization::full())
244 : }
245 0 : NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
246 0 : NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
247 : }
248 0 : }
249 : }
250 :
251 : impl From<NodeAvailability> for NodeAvailabilityWrapper {
252 0 : fn from(val: NodeAvailability) -> Self {
253 0 : match val {
254 0 : NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
255 0 : NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
256 0 : NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
257 : }
258 0 : }
259 : }
260 :
261 : /// Scheduling policy enables us to selectively disable some automatic actions that the
262 : /// controller performs on a tenant shard. This is only set to a non-default value by
263 : /// human intervention, and it is reset to the default value (Active) when the tenant's
264 : /// placement policy is modified away from Attached.
265 : ///
266 : /// The typical use of a non-Active scheduling policy is one of:
267 : /// - Pinnning a shard to a node (i.e. migrating it there & setting a non-Active scheduling policy)
268 : /// - Working around a bug (e.g. if something is flapping and we need to stop it until the bug is fixed)
269 : ///
270 : /// If you're not sure which policy to use to pin a shard to its current location, you probably
271 : /// want Pause.
272 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
273 : pub enum ShardSchedulingPolicy {
274 : // Normal mode: the tenant's scheduled locations may be updated at will, including
275 : // for non-essential optimization.
276 : Active,
277 :
278 : // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
279 : // For example, this still permits a node's attachment location to change to a secondary in
280 : // response to a node failure, or to assign a new secondary if a node was removed.
281 : Essential,
282 :
283 : // No scheduling: leave the shard running wherever it currently is. Even if the shard is
284 : // unavailable, it will not be rescheduled to another node.
285 : Pause,
286 :
287 : // No reconciling: we will make no location_conf API calls to pageservers at all. If the
288 : // shard is unavailable, it stays that way. If a node fails, this shard doesn't get failed over.
289 : Stop,
290 : }
291 :
292 : impl Default for ShardSchedulingPolicy {
293 12841 : fn default() -> Self {
294 12841 : Self::Active
295 12841 : }
296 : }
297 :
298 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
299 : pub enum NodeSchedulingPolicy {
300 : Active,
301 : Filling,
302 : Pause,
303 : PauseForRestart,
304 : Draining,
305 : }
306 :
307 : impl FromStr for NodeSchedulingPolicy {
308 : type Err = anyhow::Error;
309 :
310 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
311 0 : match s {
312 0 : "active" => Ok(Self::Active),
313 0 : "filling" => Ok(Self::Filling),
314 0 : "pause" => Ok(Self::Pause),
315 0 : "pause_for_restart" => Ok(Self::PauseForRestart),
316 0 : "draining" => Ok(Self::Draining),
317 0 : _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
318 : }
319 0 : }
320 : }
321 :
322 : impl From<NodeSchedulingPolicy> for String {
323 0 : fn from(value: NodeSchedulingPolicy) -> String {
324 : use NodeSchedulingPolicy::*;
325 0 : match value {
326 0 : Active => "active",
327 0 : Filling => "filling",
328 0 : Pause => "pause",
329 0 : PauseForRestart => "pause_for_restart",
330 0 : Draining => "draining",
331 : }
332 0 : .to_string()
333 0 : }
334 : }
335 :
336 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
337 : pub enum SkSchedulingPolicy {
338 : Active,
339 : Pause,
340 : Decomissioned,
341 : }
342 :
343 : impl FromStr for SkSchedulingPolicy {
344 : type Err = anyhow::Error;
345 :
346 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
347 0 : Ok(match s {
348 0 : "active" => Self::Active,
349 0 : "pause" => Self::Pause,
350 0 : "decomissioned" => Self::Decomissioned,
351 : _ => {
352 0 : return Err(anyhow::anyhow!(
353 0 : "Unknown scheduling policy '{s}', try active,pause,decomissioned"
354 0 : ))
355 : }
356 : })
357 0 : }
358 : }
359 :
360 : impl From<SkSchedulingPolicy> for String {
361 0 : fn from(value: SkSchedulingPolicy) -> String {
362 : use SkSchedulingPolicy::*;
363 0 : match value {
364 0 : Active => "active",
365 0 : Pause => "pause",
366 0 : Decomissioned => "decomissioned",
367 : }
368 0 : .to_string()
369 0 : }
370 : }
371 :
372 : /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
373 : /// to create secondary locations.
374 2 : #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
375 : pub enum PlacementPolicy {
376 : /// Normal live state: one attached pageserver and zero or more secondaries.
377 : Attached(usize),
378 : /// Create one secondary mode locations. This is useful when onboarding
379 : /// a tenant, or for an idle tenant that we might want to bring online quickly.
380 : Secondary,
381 :
382 : /// Do not attach to any pageservers. This is appropriate for tenants that
383 : /// have been idle for a long time, where we do not mind some delay in making
384 : /// them available in future.
385 : Detached,
386 : }
387 :
388 : impl PlacementPolicy {
389 53 : pub fn want_secondaries(&self) -> usize {
390 53 : match self {
391 50 : PlacementPolicy::Attached(secondary_count) => *secondary_count,
392 3 : PlacementPolicy::Secondary => 1,
393 0 : PlacementPolicy::Detached => 0,
394 : }
395 53 : }
396 : }
397 :
398 0 : #[derive(Serialize, Deserialize, Debug)]
399 : pub struct TenantShardMigrateResponse {}
400 :
401 : /// Metadata health record posted from scrubber.
402 0 : #[derive(Serialize, Deserialize, Debug)]
403 : pub struct MetadataHealthRecord {
404 : pub tenant_shard_id: TenantShardId,
405 : pub healthy: bool,
406 : pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
407 : }
408 :
409 0 : #[derive(Serialize, Deserialize, Debug)]
410 : pub struct MetadataHealthUpdateRequest {
411 : pub healthy_tenant_shards: HashSet<TenantShardId>,
412 : pub unhealthy_tenant_shards: HashSet<TenantShardId>,
413 : }
414 :
415 0 : #[derive(Serialize, Deserialize, Debug)]
416 : pub struct MetadataHealthUpdateResponse {}
417 :
418 0 : #[derive(Serialize, Deserialize, Debug)]
419 : pub struct MetadataHealthListUnhealthyResponse {
420 : pub unhealthy_tenant_shards: Vec<TenantShardId>,
421 : }
422 :
423 0 : #[derive(Serialize, Deserialize, Debug)]
424 : pub struct MetadataHealthListOutdatedRequest {
425 : #[serde(with = "humantime_serde")]
426 : pub not_scrubbed_for: Duration,
427 : }
428 :
429 0 : #[derive(Serialize, Deserialize, Debug)]
430 : pub struct MetadataHealthListOutdatedResponse {
431 : pub health_records: Vec<MetadataHealthRecord>,
432 : }
433 :
434 : /// Publicly exposed safekeeper description
435 0 : #[derive(Serialize, Deserialize, Clone)]
436 : pub struct SafekeeperDescribeResponse {
437 : pub id: NodeId,
438 : pub region_id: String,
439 : /// 1 is special, it means just created (not currently posted to storcon).
440 : /// Zero or negative is not really expected.
441 : /// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
442 : pub version: i64,
443 : pub host: String,
444 : pub port: i32,
445 : pub http_port: i32,
446 : pub availability_zone_id: String,
447 : pub scheduling_policy: SkSchedulingPolicy,
448 : }
449 :
450 0 : #[derive(Serialize, Deserialize, Clone)]
451 : pub struct SafekeeperSchedulingPolicyRequest {
452 : pub scheduling_policy: SkSchedulingPolicy,
453 : }
454 :
455 : #[cfg(test)]
456 : mod test {
457 : use super::*;
458 : use serde_json;
459 :
460 : /// Check stability of PlacementPolicy's serialization
461 : #[test]
462 1 : fn placement_policy_encoding() -> anyhow::Result<()> {
463 1 : let v = PlacementPolicy::Attached(1);
464 1 : let encoded = serde_json::to_string(&v)?;
465 1 : assert_eq!(encoded, "{\"Attached\":1}");
466 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
467 :
468 1 : let v = PlacementPolicy::Detached;
469 1 : let encoded = serde_json::to_string(&v)?;
470 1 : assert_eq!(encoded, "\"Detached\"");
471 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
472 1 : Ok(())
473 1 : }
474 :
475 : #[test]
476 1 : fn test_reject_unknown_field() {
477 1 : let id = TenantId::generate();
478 1 : let create_request = serde_json::json!({
479 1 : "new_tenant_id": id.to_string(),
480 1 : "unknown_field": "unknown_value".to_string(),
481 1 : });
482 1 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
483 1 : assert!(
484 1 : err.to_string().contains("unknown field `unknown_field`"),
485 0 : "expect unknown field `unknown_field` error, got: {}",
486 : err
487 : );
488 1 : }
489 : }
|