Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::fmt::Display;
3 : use std::str::FromStr;
4 : use std::time::{Duration, Instant};
5 :
6 : /// Request/response types for the storage controller
7 : /// API (`/control/v1` prefix). Implemented by the server
8 : /// in [`storage_controller::http`]
9 : use serde::{Deserialize, Serialize};
10 : use utils::id::{NodeId, TenantId};
11 :
12 : use crate::models::{PageserverUtilization, ShardParameters, TenantConfig};
13 : use crate::shard::{ShardStripeSize, TenantShardId};
14 :
15 2 : #[derive(Serialize, Deserialize, Debug)]
16 : #[serde(deny_unknown_fields)]
17 : pub struct TenantCreateRequest {
18 : pub new_tenant_id: TenantShardId,
19 : #[serde(default)]
20 : #[serde(skip_serializing_if = "Option::is_none")]
21 : pub generation: Option<u32>,
22 :
23 : // If omitted, create a single shard with TenantShardId::unsharded()
24 : #[serde(default)]
25 : #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
26 : pub shard_parameters: ShardParameters,
27 :
28 : #[serde(default)]
29 : #[serde(skip_serializing_if = "Option::is_none")]
30 : pub placement_policy: Option<PlacementPolicy>,
31 :
32 : #[serde(flatten)]
33 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
34 : }
35 :
36 0 : #[derive(Serialize, Deserialize)]
37 : pub struct TenantCreateResponseShard {
38 : pub shard_id: TenantShardId,
39 : pub node_id: NodeId,
40 : pub generation: u32,
41 : }
42 :
43 0 : #[derive(Serialize, Deserialize)]
44 : pub struct TenantCreateResponse {
45 : pub shards: Vec<TenantCreateResponseShard>,
46 : }
47 :
48 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
49 : pub struct NodeRegisterRequest {
50 : pub node_id: NodeId,
51 :
52 : pub listen_pg_addr: String,
53 : pub listen_pg_port: u16,
54 :
55 : pub listen_http_addr: String,
56 : pub listen_http_port: u16,
57 : pub listen_https_port: Option<u16>,
58 :
59 : pub availability_zone_id: AvailabilityZone,
60 : }
61 :
62 0 : #[derive(Serialize, Deserialize)]
63 : pub struct NodeConfigureRequest {
64 : pub node_id: NodeId,
65 :
66 : pub availability: Option<NodeAvailabilityWrapper>,
67 : pub scheduling: Option<NodeSchedulingPolicy>,
68 : }
69 :
70 0 : #[derive(Serialize, Deserialize)]
71 : pub struct TenantPolicyRequest {
72 : pub placement: Option<PlacementPolicy>,
73 : pub scheduling: Option<ShardSchedulingPolicy>,
74 : }
75 :
76 0 : #[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, PartialOrd, Ord)]
77 : pub struct AvailabilityZone(pub String);
78 :
79 : impl Display for AvailabilityZone {
80 300 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 300 : write!(f, "{}", self.0)
82 300 : }
83 : }
84 :
85 0 : #[derive(Serialize, Deserialize)]
86 : pub struct ShardsPreferredAzsRequest {
87 : #[serde(flatten)]
88 : pub preferred_az_ids: HashMap<TenantShardId, Option<AvailabilityZone>>,
89 : }
90 :
91 0 : #[derive(Serialize, Deserialize)]
92 : pub struct ShardsPreferredAzsResponse {
93 : pub updated: Vec<TenantShardId>,
94 : }
95 :
96 0 : #[derive(Serialize, Deserialize, Debug)]
97 : pub struct TenantLocateResponseShard {
98 : pub shard_id: TenantShardId,
99 : pub node_id: NodeId,
100 :
101 : pub listen_pg_addr: String,
102 : pub listen_pg_port: u16,
103 :
104 : pub listen_http_addr: String,
105 : pub listen_http_port: u16,
106 : pub listen_https_port: Option<u16>,
107 : }
108 :
109 0 : #[derive(Serialize, Deserialize)]
110 : pub struct TenantLocateResponse {
111 : pub shards: Vec<TenantLocateResponseShard>,
112 : pub shard_params: ShardParameters,
113 : }
114 :
115 0 : #[derive(Serialize, Deserialize, Debug)]
116 : pub struct TenantDescribeResponse {
117 : pub tenant_id: TenantId,
118 : pub shards: Vec<TenantDescribeResponseShard>,
119 : pub stripe_size: ShardStripeSize,
120 : pub policy: PlacementPolicy,
121 : pub config: TenantConfig,
122 : }
123 :
124 0 : #[derive(Serialize, Deserialize, Debug)]
125 : pub struct NodeShardResponse {
126 : pub node_id: NodeId,
127 : pub shards: Vec<NodeShard>,
128 : }
129 :
130 0 : #[derive(Serialize, Deserialize, Debug)]
131 : pub struct NodeShard {
132 : pub tenant_shard_id: TenantShardId,
133 : /// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
134 : pub is_observed_secondary: Option<bool>,
135 : /// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
136 : pub is_intended_secondary: Option<bool>,
137 : }
138 :
139 0 : #[derive(Serialize, Deserialize)]
140 : pub struct NodeDescribeResponse {
141 : pub id: NodeId,
142 :
143 : pub availability: NodeAvailabilityWrapper,
144 : pub scheduling: NodeSchedulingPolicy,
145 :
146 : pub availability_zone_id: String,
147 :
148 : pub listen_http_addr: String,
149 : pub listen_http_port: u16,
150 : pub listen_https_port: Option<u16>,
151 :
152 : pub listen_pg_addr: String,
153 : pub listen_pg_port: u16,
154 : }
155 :
156 0 : #[derive(Serialize, Deserialize, Debug)]
157 : pub struct TenantDescribeResponseShard {
158 : pub tenant_shard_id: TenantShardId,
159 :
160 : pub node_attached: Option<NodeId>,
161 : pub node_secondary: Vec<NodeId>,
162 :
163 : pub last_error: String,
164 :
165 : /// A task is currently running to reconcile this tenant's intent state with the state on pageservers
166 : pub is_reconciling: bool,
167 : /// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
168 : pub is_pending_compute_notification: bool,
169 : /// A shard split is currently underway
170 : pub is_splitting: bool,
171 :
172 : pub scheduling_policy: ShardSchedulingPolicy,
173 :
174 : pub preferred_az_id: Option<String>,
175 : }
176 :
177 : /// Migration request for a given tenant shard to a given node.
178 : ///
179 : /// Explicitly migrating a particular shard is a low level operation
180 : /// TODO: higher level "Reschedule tenant" operation where the request
181 : /// specifies some constraints, e.g. asking it to get off particular node(s)
182 1 : #[derive(Serialize, Deserialize, Debug)]
183 : pub struct TenantShardMigrateRequest {
184 : pub node_id: NodeId,
185 :
186 : /// Optionally, callers may specify the node they are migrating _from_, and the server will
187 : /// reject the request if the shard is no longer attached there: this enables writing safer
188 : /// clients that don't risk fighting with some other movement of the shard.
189 : #[serde(default)]
190 : pub origin_node_id: Option<NodeId>,
191 :
192 : #[serde(default)]
193 : pub migration_config: MigrationConfig,
194 : }
195 :
196 0 : #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
197 : pub struct MigrationConfig {
198 : /// If true, the migration will be executed even if it is to a location with a sub-optimal scheduling
199 : /// score: this is usually not what you want, and if you use this then you'll also need to set the
200 : /// tenant's scheduling policy to Essential or Pause to avoid the optimiser reverting your migration.
201 : ///
202 : /// Default: false
203 : #[serde(default)]
204 : pub override_scheduler: bool,
205 :
206 : /// If true, the migration will be done gracefully by creating a secondary location first and
207 : /// waiting for it to warm up before cutting over. If false, if there is no existing secondary
208 : /// location at the destination, the tenant will be migrated immediately. If the tenant's data
209 : /// can't be downloaded within [`Self::secondary_warmup_timeout`], then the migration will go
210 : /// ahead but run with a cold cache that can severely reduce performance until it warms up.
211 : ///
212 : /// When doing a graceful migration, the migration API returns as soon as it is started.
213 : ///
214 : /// Default: true
215 : #[serde(default = "default_prewarm")]
216 : pub prewarm: bool,
217 :
218 : /// For non-prewarm migrations which will immediately enter a cutover to the new node: how long to wait
219 : /// overall for secondary warmup before cutting over
220 : #[serde(default)]
221 : #[serde(with = "humantime_serde")]
222 : pub secondary_warmup_timeout: Option<Duration>,
223 : /// For non-prewarm migrations which will immediately enter a cutover to the new node: how long to wait
224 : /// within each secondary download poll call to pageserver.
225 : #[serde(default)]
226 : #[serde(with = "humantime_serde")]
227 : pub secondary_download_request_timeout: Option<Duration>,
228 : }
229 :
230 3 : fn default_prewarm() -> bool {
231 3 : true
232 3 : }
233 :
234 : impl Default for MigrationConfig {
235 2 : fn default() -> Self {
236 2 : Self {
237 2 : override_scheduler: false,
238 2 : prewarm: default_prewarm(),
239 2 : secondary_warmup_timeout: None,
240 2 : secondary_download_request_timeout: None,
241 2 : }
242 2 : }
243 : }
244 :
245 : #[derive(Serialize, Clone, Debug)]
246 : #[serde(into = "NodeAvailabilityWrapper")]
247 : pub enum NodeAvailability {
248 : // Normal, happy state
249 : Active(PageserverUtilization),
250 : // Node is warming up, but we expect it to become available soon. Covers
251 : // the time span between the re-attach response being composed on the storage controller
252 : // and the first successful heartbeat after the processing of the re-attach response
253 : // finishes on the pageserver.
254 : WarmingUp(Instant),
255 : // Offline: Tenants shouldn't try to attach here, but they may assume that their
256 : // secondary locations on this node still exist. Newly added nodes are in this
257 : // state until we successfully contact them.
258 : Offline,
259 : }
260 :
261 : impl PartialEq for NodeAvailability {
262 0 : fn eq(&self, other: &Self) -> bool {
263 : use NodeAvailability::*;
264 0 : matches!(
265 0 : (self, other),
266 : (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
267 : )
268 0 : }
269 : }
270 :
271 : impl Eq for NodeAvailability {}
272 :
273 : // This wrapper provides serde functionality and it should only be used to
274 : // communicate with external callers which don't know or care about the
275 : // utilisation score of the pageserver it is targeting.
276 0 : #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
277 : pub enum NodeAvailabilityWrapper {
278 : Active,
279 : WarmingUp,
280 : Offline,
281 : }
282 :
283 : impl From<NodeAvailabilityWrapper> for NodeAvailability {
284 0 : fn from(val: NodeAvailabilityWrapper) -> Self {
285 0 : match val {
286 : // Assume the worst utilisation score to begin with. It will later be updated by
287 : // the heartbeats.
288 : NodeAvailabilityWrapper::Active => {
289 0 : NodeAvailability::Active(PageserverUtilization::full())
290 : }
291 0 : NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
292 0 : NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
293 : }
294 0 : }
295 : }
296 :
297 : impl From<NodeAvailability> for NodeAvailabilityWrapper {
298 0 : fn from(val: NodeAvailability) -> Self {
299 0 : match val {
300 0 : NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
301 0 : NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
302 0 : NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
303 : }
304 0 : }
305 : }
306 :
307 : /// Scheduling policy enables us to selectively disable some automatic actions that the
308 : /// controller performs on a tenant shard. This is only set to a non-default value by
309 : /// human intervention, and it is reset to the default value (Active) when the tenant's
310 : /// placement policy is modified away from Attached.
311 : ///
312 : /// The typical use of a non-Active scheduling policy is one of:
313 : /// - Pinnning a shard to a node (i.e. migrating it there & setting a non-Active scheduling policy)
314 : /// - Working around a bug (e.g. if something is flapping and we need to stop it until the bug is fixed)
315 : ///
316 : /// If you're not sure which policy to use to pin a shard to its current location, you probably
317 : /// want Pause.
318 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
319 : pub enum ShardSchedulingPolicy {
320 : // Normal mode: the tenant's scheduled locations may be updated at will, including
321 : // for non-essential optimization.
322 : Active,
323 :
324 : // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
325 : // For example, this still permits a node's attachment location to change to a secondary in
326 : // response to a node failure, or to assign a new secondary if a node was removed.
327 : Essential,
328 :
329 : // No scheduling: leave the shard running wherever it currently is. Even if the shard is
330 : // unavailable, it will not be rescheduled to another node.
331 : Pause,
332 :
333 : // No reconciling: we will make no location_conf API calls to pageservers at all. If the
334 : // shard is unavailable, it stays that way. If a node fails, this shard doesn't get failed over.
335 : Stop,
336 : }
337 :
338 : impl Default for ShardSchedulingPolicy {
339 12843 : fn default() -> Self {
340 12843 : Self::Active
341 12843 : }
342 : }
343 :
344 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
345 : pub enum NodeSchedulingPolicy {
346 : Active,
347 : Filling,
348 : Pause,
349 : PauseForRestart,
350 : Draining,
351 : }
352 :
353 : impl FromStr for NodeSchedulingPolicy {
354 : type Err = anyhow::Error;
355 :
356 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
357 0 : match s {
358 0 : "active" => Ok(Self::Active),
359 0 : "filling" => Ok(Self::Filling),
360 0 : "pause" => Ok(Self::Pause),
361 0 : "pause_for_restart" => Ok(Self::PauseForRestart),
362 0 : "draining" => Ok(Self::Draining),
363 0 : _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
364 : }
365 0 : }
366 : }
367 :
368 : impl From<NodeSchedulingPolicy> for String {
369 0 : fn from(value: NodeSchedulingPolicy) -> String {
370 : use NodeSchedulingPolicy::*;
371 0 : match value {
372 0 : Active => "active",
373 0 : Filling => "filling",
374 0 : Pause => "pause",
375 0 : PauseForRestart => "pause_for_restart",
376 0 : Draining => "draining",
377 : }
378 0 : .to_string()
379 0 : }
380 : }
381 :
382 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
383 : pub enum SkSchedulingPolicy {
384 : Active,
385 : Pause,
386 : Decomissioned,
387 : }
388 :
389 : impl FromStr for SkSchedulingPolicy {
390 : type Err = anyhow::Error;
391 :
392 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
393 0 : Ok(match s {
394 0 : "active" => Self::Active,
395 0 : "pause" => Self::Pause,
396 0 : "decomissioned" => Self::Decomissioned,
397 : _ => {
398 0 : return Err(anyhow::anyhow!(
399 0 : "Unknown scheduling policy '{s}', try active,pause,decomissioned"
400 0 : ));
401 : }
402 : })
403 0 : }
404 : }
405 :
406 : impl From<SkSchedulingPolicy> for String {
407 0 : fn from(value: SkSchedulingPolicy) -> String {
408 : use SkSchedulingPolicy::*;
409 0 : match value {
410 0 : Active => "active",
411 0 : Pause => "pause",
412 0 : Decomissioned => "decomissioned",
413 : }
414 0 : .to_string()
415 0 : }
416 : }
417 :
418 : /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
419 : /// to create secondary locations.
420 2 : #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
421 : pub enum PlacementPolicy {
422 : /// Normal live state: one attached pageserver and zero or more secondaries.
423 : Attached(usize),
424 : /// Create one secondary mode locations. This is useful when onboarding
425 : /// a tenant, or for an idle tenant that we might want to bring online quickly.
426 : Secondary,
427 :
428 : /// Do not attach to any pageservers. This is appropriate for tenants that
429 : /// have been idle for a long time, where we do not mind some delay in making
430 : /// them available in future.
431 : Detached,
432 : }
433 :
434 : impl PlacementPolicy {
435 55 : pub fn want_secondaries(&self) -> usize {
436 55 : match self {
437 52 : PlacementPolicy::Attached(secondary_count) => *secondary_count,
438 3 : PlacementPolicy::Secondary => 1,
439 0 : PlacementPolicy::Detached => 0,
440 : }
441 55 : }
442 : }
443 :
444 0 : #[derive(Serialize, Deserialize, Debug)]
445 : pub struct TenantShardMigrateResponse {}
446 :
447 : /// Metadata health record posted from scrubber.
448 0 : #[derive(Serialize, Deserialize, Debug)]
449 : pub struct MetadataHealthRecord {
450 : pub tenant_shard_id: TenantShardId,
451 : pub healthy: bool,
452 : pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
453 : }
454 :
455 0 : #[derive(Serialize, Deserialize, Debug)]
456 : pub struct MetadataHealthUpdateRequest {
457 : pub healthy_tenant_shards: HashSet<TenantShardId>,
458 : pub unhealthy_tenant_shards: HashSet<TenantShardId>,
459 : }
460 :
461 0 : #[derive(Serialize, Deserialize, Debug)]
462 : pub struct MetadataHealthUpdateResponse {}
463 :
464 0 : #[derive(Serialize, Deserialize, Debug)]
465 : pub struct MetadataHealthListUnhealthyResponse {
466 : pub unhealthy_tenant_shards: Vec<TenantShardId>,
467 : }
468 :
469 0 : #[derive(Serialize, Deserialize, Debug)]
470 : pub struct MetadataHealthListOutdatedRequest {
471 : #[serde(with = "humantime_serde")]
472 : pub not_scrubbed_for: Duration,
473 : }
474 :
475 0 : #[derive(Serialize, Deserialize, Debug)]
476 : pub struct MetadataHealthListOutdatedResponse {
477 : pub health_records: Vec<MetadataHealthRecord>,
478 : }
479 :
480 : /// Publicly exposed safekeeper description
481 0 : #[derive(Serialize, Deserialize, Clone)]
482 : pub struct SafekeeperDescribeResponse {
483 : pub id: NodeId,
484 : pub region_id: String,
485 : /// 1 is special, it means just created (not currently posted to storcon).
486 : /// Zero or negative is not really expected.
487 : /// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
488 : pub version: i64,
489 : pub host: String,
490 : pub port: i32,
491 : pub http_port: i32,
492 : pub https_port: Option<i32>,
493 : pub availability_zone_id: String,
494 : pub scheduling_policy: SkSchedulingPolicy,
495 : }
496 :
497 0 : #[derive(Serialize, Deserialize, Clone)]
498 : pub struct SafekeeperSchedulingPolicyRequest {
499 : pub scheduling_policy: SkSchedulingPolicy,
500 : }
501 :
502 : #[cfg(test)]
503 : mod test {
504 : use serde_json;
505 :
506 : use super::*;
507 :
508 : /// Check stability of PlacementPolicy's serialization
509 : #[test]
510 1 : fn placement_policy_encoding() -> anyhow::Result<()> {
511 1 : let v = PlacementPolicy::Attached(1);
512 1 : let encoded = serde_json::to_string(&v)?;
513 1 : assert_eq!(encoded, "{\"Attached\":1}");
514 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
515 :
516 1 : let v = PlacementPolicy::Detached;
517 1 : let encoded = serde_json::to_string(&v)?;
518 1 : assert_eq!(encoded, "\"Detached\"");
519 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
520 1 : Ok(())
521 1 : }
522 :
523 : #[test]
524 1 : fn test_reject_unknown_field() {
525 1 : let id = TenantId::generate();
526 1 : let create_request = serde_json::json!({
527 1 : "new_tenant_id": id.to_string(),
528 1 : "unknown_field": "unknown_value".to_string(),
529 1 : });
530 1 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
531 1 : assert!(
532 1 : err.to_string().contains("unknown field `unknown_field`"),
533 0 : "expect unknown field `unknown_field` error, got: {}",
534 : err
535 : );
536 1 : }
537 :
538 : /// Check that a minimal migrate request with no config results in the expected default settings
539 : #[test]
540 1 : fn test_migrate_request_decode_defaults() {
541 1 : let json = r#"{
542 1 : "node_id": 123
543 1 : }"#;
544 1 :
545 1 : let request: TenantShardMigrateRequest = serde_json::from_str(json).unwrap();
546 1 : assert_eq!(request.node_id, NodeId(123));
547 1 : assert_eq!(request.origin_node_id, None);
548 1 : assert!(!request.migration_config.override_scheduler);
549 1 : assert!(request.migration_config.prewarm);
550 1 : assert_eq!(request.migration_config.secondary_warmup_timeout, None);
551 1 : assert_eq!(
552 1 : request.migration_config.secondary_download_request_timeout,
553 1 : None
554 1 : );
555 1 : }
556 :
557 : /// Check that a partially specified migration config results in the expected default settings
558 : #[test]
559 1 : fn test_migration_config_decode_defaults() {
560 1 : // Specify just one field of the config
561 1 : let json = r#"{
562 1 : }"#;
563 1 :
564 1 : let config: MigrationConfig = serde_json::from_str(json).unwrap();
565 1 :
566 1 : // Check each field's expected default value
567 1 : assert!(!config.override_scheduler);
568 1 : assert!(config.prewarm);
569 1 : assert_eq!(config.secondary_warmup_timeout, None);
570 1 : assert_eq!(config.secondary_download_request_timeout, None);
571 1 : assert_eq!(config.secondary_warmup_timeout, None);
572 :
573 : // Consistency check that the Default impl agrees with our serde defaults
574 1 : assert_eq!(MigrationConfig::default(), config);
575 1 : }
576 : }
|