Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::fmt::Display;
3 : use std::str::FromStr;
4 : use std::time::{Duration, Instant};
5 :
6 : /// Request/response types for the storage controller
7 : /// API (`/control/v1` prefix). Implemented by the server
8 : /// in [`storage_controller::http`]
9 : use serde::{Deserialize, Serialize};
10 : use utils::id::{NodeId, TenantId, TimelineId};
11 : use utils::lsn::Lsn;
12 :
13 : use crate::models::{PageserverUtilization, ShardParameters, TenantConfig};
14 : use crate::shard::{ShardStripeSize, TenantShardId};
15 :
16 2 : #[derive(Serialize, Deserialize, Debug)]
17 : #[serde(deny_unknown_fields)]
18 : pub struct TenantCreateRequest {
19 : pub new_tenant_id: TenantShardId,
20 : #[serde(default)]
21 : #[serde(skip_serializing_if = "Option::is_none")]
22 : pub generation: Option<u32>,
23 :
24 : // If omitted, create a single shard with TenantShardId::unsharded()
25 : #[serde(default)]
26 : #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
27 : pub shard_parameters: ShardParameters,
28 :
29 : #[serde(default)]
30 : #[serde(skip_serializing_if = "Option::is_none")]
31 : pub placement_policy: Option<PlacementPolicy>,
32 :
33 : #[serde(flatten)]
34 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
35 : }
36 :
37 0 : #[derive(Serialize, Deserialize)]
38 : pub struct TenantCreateResponseShard {
39 : pub shard_id: TenantShardId,
40 : pub node_id: NodeId,
41 : pub generation: u32,
42 : }
43 :
44 0 : #[derive(Serialize, Deserialize)]
45 : pub struct TenantCreateResponse {
46 : pub shards: Vec<TenantCreateResponseShard>,
47 : }
48 :
49 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
50 : pub struct NodeRegisterRequest {
51 : pub node_id: NodeId,
52 :
53 : pub listen_pg_addr: String,
54 : pub listen_pg_port: u16,
55 :
56 : pub listen_http_addr: String,
57 : pub listen_http_port: u16,
58 : pub listen_https_port: Option<u16>,
59 :
60 : pub availability_zone_id: AvailabilityZone,
61 : }
62 :
63 0 : #[derive(Serialize, Deserialize)]
64 : pub struct NodeConfigureRequest {
65 : pub node_id: NodeId,
66 :
67 : pub availability: Option<NodeAvailabilityWrapper>,
68 : pub scheduling: Option<NodeSchedulingPolicy>,
69 : }
70 :
71 0 : #[derive(Serialize, Deserialize)]
72 : pub struct TenantPolicyRequest {
73 : pub placement: Option<PlacementPolicy>,
74 : pub scheduling: Option<ShardSchedulingPolicy>,
75 : }
76 :
77 0 : #[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, PartialOrd, Ord)]
78 : pub struct AvailabilityZone(pub String);
79 :
80 : impl Display for AvailabilityZone {
81 300 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 300 : write!(f, "{}", self.0)
83 300 : }
84 : }
85 :
86 0 : #[derive(Serialize, Deserialize)]
87 : pub struct ShardsPreferredAzsRequest {
88 : #[serde(flatten)]
89 : pub preferred_az_ids: HashMap<TenantShardId, Option<AvailabilityZone>>,
90 : }
91 :
92 0 : #[derive(Serialize, Deserialize)]
93 : pub struct ShardsPreferredAzsResponse {
94 : pub updated: Vec<TenantShardId>,
95 : }
96 :
97 0 : #[derive(Serialize, Deserialize, Debug)]
98 : pub struct TenantLocateResponseShard {
99 : pub shard_id: TenantShardId,
100 : pub node_id: NodeId,
101 :
102 : pub listen_pg_addr: String,
103 : pub listen_pg_port: u16,
104 :
105 : pub listen_http_addr: String,
106 : pub listen_http_port: u16,
107 : pub listen_https_port: Option<u16>,
108 : }
109 :
110 0 : #[derive(Serialize, Deserialize)]
111 : pub struct TenantLocateResponse {
112 : pub shards: Vec<TenantLocateResponseShard>,
113 : pub shard_params: ShardParameters,
114 : }
115 :
116 0 : #[derive(Serialize, Deserialize, Debug)]
117 : pub struct TenantDescribeResponse {
118 : pub tenant_id: TenantId,
119 : pub shards: Vec<TenantDescribeResponseShard>,
120 : pub stripe_size: ShardStripeSize,
121 : pub policy: PlacementPolicy,
122 : pub config: TenantConfig,
123 : }
124 :
125 0 : #[derive(Serialize, Deserialize, Debug)]
126 : pub struct NodeShardResponse {
127 : pub node_id: NodeId,
128 : pub shards: Vec<NodeShard>,
129 : }
130 :
131 0 : #[derive(Serialize, Deserialize, Debug)]
132 : pub struct NodeShard {
133 : pub tenant_shard_id: TenantShardId,
134 : /// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
135 : pub is_observed_secondary: Option<bool>,
136 : /// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
137 : pub is_intended_secondary: Option<bool>,
138 : }
139 :
140 0 : #[derive(Serialize, Deserialize)]
141 : pub struct NodeDescribeResponse {
142 : pub id: NodeId,
143 :
144 : pub availability: NodeAvailabilityWrapper,
145 : pub scheduling: NodeSchedulingPolicy,
146 :
147 : pub availability_zone_id: String,
148 :
149 : pub listen_http_addr: String,
150 : pub listen_http_port: u16,
151 : pub listen_https_port: Option<u16>,
152 :
153 : pub listen_pg_addr: String,
154 : pub listen_pg_port: u16,
155 : }
156 :
157 0 : #[derive(Serialize, Deserialize, Debug)]
158 : pub struct TenantDescribeResponseShard {
159 : pub tenant_shard_id: TenantShardId,
160 :
161 : pub node_attached: Option<NodeId>,
162 : pub node_secondary: Vec<NodeId>,
163 :
164 : pub last_error: String,
165 :
166 : /// A task is currently running to reconcile this tenant's intent state with the state on pageservers
167 : pub is_reconciling: bool,
168 : /// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
169 : pub is_pending_compute_notification: bool,
170 : /// A shard split is currently underway
171 : pub is_splitting: bool,
172 : /// A timeline is being imported into this tenant
173 : pub is_importing: bool,
174 :
175 : pub scheduling_policy: ShardSchedulingPolicy,
176 :
177 : pub preferred_az_id: Option<String>,
178 : }
179 :
180 : /// Migration request for a given tenant shard to a given node.
181 : ///
182 : /// Explicitly migrating a particular shard is a low level operation
183 : /// TODO: higher level "Reschedule tenant" operation where the request
184 : /// specifies some constraints, e.g. asking it to get off particular node(s)
185 1 : #[derive(Serialize, Deserialize, Debug)]
186 : pub struct TenantShardMigrateRequest {
187 : pub node_id: NodeId,
188 :
189 : /// Optionally, callers may specify the node they are migrating _from_, and the server will
190 : /// reject the request if the shard is no longer attached there: this enables writing safer
191 : /// clients that don't risk fighting with some other movement of the shard.
192 : #[serde(default)]
193 : pub origin_node_id: Option<NodeId>,
194 :
195 : #[serde(default)]
196 : pub migration_config: MigrationConfig,
197 : }
198 :
199 0 : #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
200 : pub struct MigrationConfig {
201 : /// If true, the migration will be executed even if it is to a location with a sub-optimal scheduling
202 : /// score: this is usually not what you want, and if you use this then you'll also need to set the
203 : /// tenant's scheduling policy to Essential or Pause to avoid the optimiser reverting your migration.
204 : ///
205 : /// Default: false
206 : #[serde(default)]
207 : pub override_scheduler: bool,
208 :
209 : /// If true, the migration will be done gracefully by creating a secondary location first and
210 : /// waiting for it to warm up before cutting over. If false, if there is no existing secondary
211 : /// location at the destination, the tenant will be migrated immediately. If the tenant's data
212 : /// can't be downloaded within [`Self::secondary_warmup_timeout`], then the migration will go
213 : /// ahead but run with a cold cache that can severely reduce performance until it warms up.
214 : ///
215 : /// When doing a graceful migration, the migration API returns as soon as it is started.
216 : ///
217 : /// Default: true
218 : #[serde(default = "default_prewarm")]
219 : pub prewarm: bool,
220 :
221 : /// For non-prewarm migrations which will immediately enter a cutover to the new node: how long to wait
222 : /// overall for secondary warmup before cutting over
223 : #[serde(default)]
224 : #[serde(with = "humantime_serde")]
225 : pub secondary_warmup_timeout: Option<Duration>,
226 : /// For non-prewarm migrations which will immediately enter a cutover to the new node: how long to wait
227 : /// within each secondary download poll call to pageserver.
228 : #[serde(default)]
229 : #[serde(with = "humantime_serde")]
230 : pub secondary_download_request_timeout: Option<Duration>,
231 : }
232 :
233 3 : fn default_prewarm() -> bool {
234 3 : true
235 3 : }
236 :
237 : impl Default for MigrationConfig {
238 2 : fn default() -> Self {
239 2 : Self {
240 2 : override_scheduler: false,
241 2 : prewarm: default_prewarm(),
242 2 : secondary_warmup_timeout: None,
243 2 : secondary_download_request_timeout: None,
244 2 : }
245 2 : }
246 : }
247 :
248 : #[derive(Serialize, Clone, Debug)]
249 : #[serde(into = "NodeAvailabilityWrapper")]
250 : pub enum NodeAvailability {
251 : // Normal, happy state
252 : Active(PageserverUtilization),
253 : // Node is warming up, but we expect it to become available soon. Covers
254 : // the time span between the re-attach response being composed on the storage controller
255 : // and the first successful heartbeat after the processing of the re-attach response
256 : // finishes on the pageserver.
257 : WarmingUp(Instant),
258 : // Offline: Tenants shouldn't try to attach here, but they may assume that their
259 : // secondary locations on this node still exist. Newly added nodes are in this
260 : // state until we successfully contact them.
261 : Offline,
262 : }
263 :
264 : impl PartialEq for NodeAvailability {
265 0 : fn eq(&self, other: &Self) -> bool {
266 : use NodeAvailability::*;
267 0 : matches!(
268 0 : (self, other),
269 : (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
270 : )
271 0 : }
272 : }
273 :
274 : impl Eq for NodeAvailability {}
275 :
276 : // This wrapper provides serde functionality and it should only be used to
277 : // communicate with external callers which don't know or care about the
278 : // utilisation score of the pageserver it is targeting.
279 0 : #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
280 : pub enum NodeAvailabilityWrapper {
281 : Active,
282 : WarmingUp,
283 : Offline,
284 : }
285 :
286 : impl From<NodeAvailabilityWrapper> for NodeAvailability {
287 0 : fn from(val: NodeAvailabilityWrapper) -> Self {
288 0 : match val {
289 : // Assume the worst utilisation score to begin with. It will later be updated by
290 : // the heartbeats.
291 : NodeAvailabilityWrapper::Active => {
292 0 : NodeAvailability::Active(PageserverUtilization::full())
293 : }
294 0 : NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
295 0 : NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
296 : }
297 0 : }
298 : }
299 :
300 : impl From<NodeAvailability> for NodeAvailabilityWrapper {
301 0 : fn from(val: NodeAvailability) -> Self {
302 0 : match val {
303 0 : NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
304 0 : NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
305 0 : NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
306 : }
307 0 : }
308 : }
309 :
310 : /// Scheduling policy enables us to selectively disable some automatic actions that the
311 : /// controller performs on a tenant shard. This is only set to a non-default value by
312 : /// human intervention, and it is reset to the default value (Active) when the tenant's
313 : /// placement policy is modified away from Attached.
314 : ///
315 : /// The typical use of a non-Active scheduling policy is one of:
316 : /// - Pinnning a shard to a node (i.e. migrating it there & setting a non-Active scheduling policy)
317 : /// - Working around a bug (e.g. if something is flapping and we need to stop it until the bug is fixed)
318 : ///
319 : /// If you're not sure which policy to use to pin a shard to its current location, you probably
320 : /// want Pause.
321 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
322 : pub enum ShardSchedulingPolicy {
323 : // Normal mode: the tenant's scheduled locations may be updated at will, including
324 : // for non-essential optimization.
325 : Active,
326 :
327 : // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
328 : // For example, this still permits a node's attachment location to change to a secondary in
329 : // response to a node failure, or to assign a new secondary if a node was removed.
330 : Essential,
331 :
332 : // No scheduling: leave the shard running wherever it currently is. Even if the shard is
333 : // unavailable, it will not be rescheduled to another node.
334 : Pause,
335 :
336 : // No reconciling: we will make no location_conf API calls to pageservers at all. If the
337 : // shard is unavailable, it stays that way. If a node fails, this shard doesn't get failed over.
338 : Stop,
339 : }
340 :
341 : impl Default for ShardSchedulingPolicy {
342 12843 : fn default() -> Self {
343 12843 : Self::Active
344 12843 : }
345 : }
346 :
347 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
348 : pub enum NodeSchedulingPolicy {
349 : Active,
350 : Filling,
351 : Pause,
352 : PauseForRestart,
353 : Draining,
354 : }
355 :
356 : impl FromStr for NodeSchedulingPolicy {
357 : type Err = anyhow::Error;
358 :
359 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
360 0 : match s {
361 0 : "active" => Ok(Self::Active),
362 0 : "filling" => Ok(Self::Filling),
363 0 : "pause" => Ok(Self::Pause),
364 0 : "pause_for_restart" => Ok(Self::PauseForRestart),
365 0 : "draining" => Ok(Self::Draining),
366 0 : _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
367 : }
368 0 : }
369 : }
370 :
371 : impl From<NodeSchedulingPolicy> for String {
372 0 : fn from(value: NodeSchedulingPolicy) -> String {
373 : use NodeSchedulingPolicy::*;
374 0 : match value {
375 0 : Active => "active",
376 0 : Filling => "filling",
377 0 : Pause => "pause",
378 0 : PauseForRestart => "pause_for_restart",
379 0 : Draining => "draining",
380 : }
381 0 : .to_string()
382 0 : }
383 : }
384 :
385 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
386 : pub enum SkSchedulingPolicy {
387 : Active,
388 : Pause,
389 : Decomissioned,
390 : }
391 :
392 : impl FromStr for SkSchedulingPolicy {
393 : type Err = anyhow::Error;
394 :
395 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
396 0 : Ok(match s {
397 0 : "active" => Self::Active,
398 0 : "pause" => Self::Pause,
399 0 : "decomissioned" => Self::Decomissioned,
400 : _ => {
401 0 : return Err(anyhow::anyhow!(
402 0 : "Unknown scheduling policy '{s}', try active,pause,decomissioned"
403 0 : ));
404 : }
405 : })
406 0 : }
407 : }
408 :
409 : impl From<SkSchedulingPolicy> for String {
410 0 : fn from(value: SkSchedulingPolicy) -> String {
411 : use SkSchedulingPolicy::*;
412 0 : match value {
413 0 : Active => "active",
414 0 : Pause => "pause",
415 0 : Decomissioned => "decomissioned",
416 : }
417 0 : .to_string()
418 0 : }
419 : }
420 :
421 : /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
422 : /// to create secondary locations.
423 2 : #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
424 : pub enum PlacementPolicy {
425 : /// Normal live state: one attached pageserver and zero or more secondaries.
426 : Attached(usize),
427 : /// Create one secondary mode locations. This is useful when onboarding
428 : /// a tenant, or for an idle tenant that we might want to bring online quickly.
429 : Secondary,
430 :
431 : /// Do not attach to any pageservers. This is appropriate for tenants that
432 : /// have been idle for a long time, where we do not mind some delay in making
433 : /// them available in future.
434 : Detached,
435 : }
436 :
437 : impl PlacementPolicy {
438 55 : pub fn want_secondaries(&self) -> usize {
439 55 : match self {
440 52 : PlacementPolicy::Attached(secondary_count) => *secondary_count,
441 3 : PlacementPolicy::Secondary => 1,
442 0 : PlacementPolicy::Detached => 0,
443 : }
444 55 : }
445 : }
446 :
447 0 : #[derive(Serialize, Deserialize, Debug)]
448 : pub struct TenantShardMigrateResponse {}
449 :
450 : /// Metadata health record posted from scrubber.
451 0 : #[derive(Serialize, Deserialize, Debug)]
452 : pub struct MetadataHealthRecord {
453 : pub tenant_shard_id: TenantShardId,
454 : pub healthy: bool,
455 : pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
456 : }
457 :
458 0 : #[derive(Serialize, Deserialize, Debug)]
459 : pub struct MetadataHealthUpdateRequest {
460 : pub healthy_tenant_shards: HashSet<TenantShardId>,
461 : pub unhealthy_tenant_shards: HashSet<TenantShardId>,
462 : }
463 :
464 0 : #[derive(Serialize, Deserialize, Debug)]
465 : pub struct MetadataHealthUpdateResponse {}
466 :
467 0 : #[derive(Serialize, Deserialize, Debug)]
468 : pub struct MetadataHealthListUnhealthyResponse {
469 : pub unhealthy_tenant_shards: Vec<TenantShardId>,
470 : }
471 :
472 0 : #[derive(Serialize, Deserialize, Debug)]
473 : pub struct MetadataHealthListOutdatedRequest {
474 : #[serde(with = "humantime_serde")]
475 : pub not_scrubbed_for: Duration,
476 : }
477 :
478 0 : #[derive(Serialize, Deserialize, Debug)]
479 : pub struct MetadataHealthListOutdatedResponse {
480 : pub health_records: Vec<MetadataHealthRecord>,
481 : }
482 :
483 : /// Publicly exposed safekeeper description
484 0 : #[derive(Serialize, Deserialize, Clone)]
485 : pub struct SafekeeperDescribeResponse {
486 : pub id: NodeId,
487 : pub region_id: String,
488 : /// 1 is special, it means just created (not currently posted to storcon).
489 : /// Zero or negative is not really expected.
490 : /// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
491 : pub version: i64,
492 : pub host: String,
493 : pub port: i32,
494 : pub http_port: i32,
495 : pub https_port: Option<i32>,
496 : pub availability_zone_id: String,
497 : pub scheduling_policy: SkSchedulingPolicy,
498 : }
499 :
500 0 : #[derive(Serialize, Deserialize, Clone)]
501 : pub struct SafekeeperSchedulingPolicyRequest {
502 : pub scheduling_policy: SkSchedulingPolicy,
503 : }
504 :
505 : /// Import request for safekeeper timelines.
506 0 : #[derive(Serialize, Deserialize, Clone)]
507 : pub struct TimelineImportRequest {
508 : pub tenant_id: TenantId,
509 : pub timeline_id: TimelineId,
510 : pub start_lsn: Lsn,
511 : pub sk_set: Vec<NodeId>,
512 : }
513 :
514 : #[cfg(test)]
515 : mod test {
516 : use serde_json;
517 :
518 : use super::*;
519 :
520 : /// Check stability of PlacementPolicy's serialization
521 : #[test]
522 1 : fn placement_policy_encoding() -> anyhow::Result<()> {
523 1 : let v = PlacementPolicy::Attached(1);
524 1 : let encoded = serde_json::to_string(&v)?;
525 1 : assert_eq!(encoded, "{\"Attached\":1}");
526 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
527 :
528 1 : let v = PlacementPolicy::Detached;
529 1 : let encoded = serde_json::to_string(&v)?;
530 1 : assert_eq!(encoded, "\"Detached\"");
531 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
532 1 : Ok(())
533 1 : }
534 :
535 : #[test]
536 1 : fn test_reject_unknown_field() {
537 1 : let id = TenantId::generate();
538 1 : let create_request = serde_json::json!({
539 1 : "new_tenant_id": id.to_string(),
540 1 : "unknown_field": "unknown_value".to_string(),
541 1 : });
542 1 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
543 1 : assert!(
544 1 : err.to_string().contains("unknown field `unknown_field`"),
545 0 : "expect unknown field `unknown_field` error, got: {}",
546 : err
547 : );
548 1 : }
549 :
550 : /// Check that a minimal migrate request with no config results in the expected default settings
551 : #[test]
552 1 : fn test_migrate_request_decode_defaults() {
553 1 : let json = r#"{
554 1 : "node_id": 123
555 1 : }"#;
556 1 :
557 1 : let request: TenantShardMigrateRequest = serde_json::from_str(json).unwrap();
558 1 : assert_eq!(request.node_id, NodeId(123));
559 1 : assert_eq!(request.origin_node_id, None);
560 1 : assert!(!request.migration_config.override_scheduler);
561 1 : assert!(request.migration_config.prewarm);
562 1 : assert_eq!(request.migration_config.secondary_warmup_timeout, None);
563 1 : assert_eq!(
564 1 : request.migration_config.secondary_download_request_timeout,
565 1 : None
566 1 : );
567 1 : }
568 :
569 : /// Check that a partially specified migration config results in the expected default settings
570 : #[test]
571 1 : fn test_migration_config_decode_defaults() {
572 1 : // Specify just one field of the config
573 1 : let json = r#"{
574 1 : }"#;
575 1 :
576 1 : let config: MigrationConfig = serde_json::from_str(json).unwrap();
577 1 :
578 1 : // Check each field's expected default value
579 1 : assert!(!config.override_scheduler);
580 1 : assert!(config.prewarm);
581 1 : assert_eq!(config.secondary_warmup_timeout, None);
582 1 : assert_eq!(config.secondary_download_request_timeout, None);
583 1 : assert_eq!(config.secondary_warmup_timeout, None);
584 :
585 : // Consistency check that the Default impl agrees with our serde defaults
586 1 : assert_eq!(MigrationConfig::default(), config);
587 1 : }
588 : }
|