Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::fmt::Display;
3 : use std::str::FromStr;
4 : use std::time::{Duration, Instant};
5 :
6 : /// Request/response types for the storage controller
7 : /// API (`/control/v1` prefix). Implemented by the server
8 : /// in [`storage_controller::http`]
9 : use serde::{Deserialize, Serialize};
10 : use utils::id::{NodeId, TenantId, TimelineId};
11 : use utils::lsn::Lsn;
12 :
13 : use crate::models::{PageserverUtilization, ShardParameters, TenantConfig};
14 : use crate::shard::{ShardStripeSize, TenantShardId};
15 :
16 2 : #[derive(Serialize, Deserialize, Debug)]
17 : #[serde(deny_unknown_fields)]
18 : pub struct TenantCreateRequest {
19 : pub new_tenant_id: TenantShardId,
20 : #[serde(default)]
21 : #[serde(skip_serializing_if = "Option::is_none")]
22 : pub generation: Option<u32>,
23 :
24 : // If omitted, create a single shard with TenantShardId::unsharded()
25 : #[serde(default)]
26 : #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
27 : pub shard_parameters: ShardParameters,
28 :
29 : #[serde(default)]
30 : #[serde(skip_serializing_if = "Option::is_none")]
31 : pub placement_policy: Option<PlacementPolicy>,
32 :
33 : #[serde(flatten)]
34 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
35 : }
36 :
37 0 : #[derive(Serialize, Deserialize)]
38 : pub struct TenantCreateResponseShard {
39 : pub shard_id: TenantShardId,
40 : pub node_id: NodeId,
41 : pub generation: u32,
42 : }
43 :
44 0 : #[derive(Serialize, Deserialize)]
45 : pub struct TenantCreateResponse {
46 : pub shards: Vec<TenantCreateResponseShard>,
47 : }
48 :
49 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
50 : pub struct NodeRegisterRequest {
51 : pub node_id: NodeId,
52 :
53 : pub listen_pg_addr: String,
54 : pub listen_pg_port: u16,
55 :
56 : pub listen_http_addr: String,
57 : pub listen_http_port: u16,
58 : pub listen_https_port: Option<u16>,
59 :
60 : pub availability_zone_id: AvailabilityZone,
61 : }
62 :
63 0 : #[derive(Serialize, Deserialize)]
64 : pub struct NodeConfigureRequest {
65 : pub node_id: NodeId,
66 :
67 : pub availability: Option<NodeAvailabilityWrapper>,
68 : pub scheduling: Option<NodeSchedulingPolicy>,
69 : }
70 :
71 0 : #[derive(Serialize, Deserialize)]
72 : pub struct TenantPolicyRequest {
73 : pub placement: Option<PlacementPolicy>,
74 : pub scheduling: Option<ShardSchedulingPolicy>,
75 : }
76 :
77 0 : #[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, PartialOrd, Ord)]
78 : pub struct AvailabilityZone(pub String);
79 :
80 : impl Display for AvailabilityZone {
81 300 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 300 : write!(f, "{}", self.0)
83 300 : }
84 : }
85 :
86 0 : #[derive(Serialize, Deserialize)]
87 : pub struct ShardsPreferredAzsRequest {
88 : #[serde(flatten)]
89 : pub preferred_az_ids: HashMap<TenantShardId, Option<AvailabilityZone>>,
90 : }
91 :
92 0 : #[derive(Serialize, Deserialize)]
93 : pub struct ShardsPreferredAzsResponse {
94 : pub updated: Vec<TenantShardId>,
95 : }
96 :
97 0 : #[derive(Serialize, Deserialize, Debug)]
98 : pub struct TenantLocateResponseShard {
99 : pub shard_id: TenantShardId,
100 : pub node_id: NodeId,
101 :
102 : pub listen_pg_addr: String,
103 : pub listen_pg_port: u16,
104 :
105 : pub listen_http_addr: String,
106 : pub listen_http_port: u16,
107 : pub listen_https_port: Option<u16>,
108 : }
109 :
110 0 : #[derive(Serialize, Deserialize)]
111 : pub struct TenantLocateResponse {
112 : pub shards: Vec<TenantLocateResponseShard>,
113 : pub shard_params: ShardParameters,
114 : }
115 :
116 0 : #[derive(Serialize, Deserialize, Debug)]
117 : pub struct TenantDescribeResponse {
118 : pub tenant_id: TenantId,
119 : pub shards: Vec<TenantDescribeResponseShard>,
120 : pub stripe_size: ShardStripeSize,
121 : pub policy: PlacementPolicy,
122 : pub config: TenantConfig,
123 : }
124 :
125 0 : #[derive(Serialize, Deserialize, Debug)]
126 : pub struct NodeShardResponse {
127 : pub node_id: NodeId,
128 : pub shards: Vec<NodeShard>,
129 : }
130 :
131 0 : #[derive(Serialize, Deserialize, Debug)]
132 : pub struct NodeShard {
133 : pub tenant_shard_id: TenantShardId,
134 : /// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
135 : pub is_observed_secondary: Option<bool>,
136 : /// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
137 : pub is_intended_secondary: Option<bool>,
138 : }
139 :
140 0 : #[derive(Serialize, Deserialize)]
141 : pub struct NodeDescribeResponse {
142 : pub id: NodeId,
143 :
144 : pub availability: NodeAvailabilityWrapper,
145 : pub scheduling: NodeSchedulingPolicy,
146 :
147 : pub availability_zone_id: String,
148 :
149 : pub listen_http_addr: String,
150 : pub listen_http_port: u16,
151 : pub listen_https_port: Option<u16>,
152 :
153 : pub listen_pg_addr: String,
154 : pub listen_pg_port: u16,
155 : }
156 :
157 0 : #[derive(Serialize, Deserialize, Debug)]
158 : pub struct TenantDescribeResponseShard {
159 : pub tenant_shard_id: TenantShardId,
160 :
161 : pub node_attached: Option<NodeId>,
162 : pub node_secondary: Vec<NodeId>,
163 :
164 : pub last_error: String,
165 :
166 : /// A task is currently running to reconcile this tenant's intent state with the state on pageservers
167 : pub is_reconciling: bool,
168 : /// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
169 : pub is_pending_compute_notification: bool,
170 : /// A shard split is currently underway
171 : pub is_splitting: bool,
172 :
173 : pub scheduling_policy: ShardSchedulingPolicy,
174 :
175 : pub preferred_az_id: Option<String>,
176 : }
177 :
178 : /// Migration request for a given tenant shard to a given node.
179 : ///
180 : /// Explicitly migrating a particular shard is a low level operation
181 : /// TODO: higher level "Reschedule tenant" operation where the request
182 : /// specifies some constraints, e.g. asking it to get off particular node(s)
183 1 : #[derive(Serialize, Deserialize, Debug)]
184 : pub struct TenantShardMigrateRequest {
185 : pub node_id: NodeId,
186 :
187 : /// Optionally, callers may specify the node they are migrating _from_, and the server will
188 : /// reject the request if the shard is no longer attached there: this enables writing safer
189 : /// clients that don't risk fighting with some other movement of the shard.
190 : #[serde(default)]
191 : pub origin_node_id: Option<NodeId>,
192 :
193 : #[serde(default)]
194 : pub migration_config: MigrationConfig,
195 : }
196 :
197 0 : #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
198 : pub struct MigrationConfig {
199 : /// If true, the migration will be executed even if it is to a location with a sub-optimal scheduling
200 : /// score: this is usually not what you want, and if you use this then you'll also need to set the
201 : /// tenant's scheduling policy to Essential or Pause to avoid the optimiser reverting your migration.
202 : ///
203 : /// Default: false
204 : #[serde(default)]
205 : pub override_scheduler: bool,
206 :
207 : /// If true, the migration will be done gracefully by creating a secondary location first and
208 : /// waiting for it to warm up before cutting over. If false, if there is no existing secondary
209 : /// location at the destination, the tenant will be migrated immediately. If the tenant's data
210 : /// can't be downloaded within [`Self::secondary_warmup_timeout`], then the migration will go
211 : /// ahead but run with a cold cache that can severely reduce performance until it warms up.
212 : ///
213 : /// When doing a graceful migration, the migration API returns as soon as it is started.
214 : ///
215 : /// Default: true
216 : #[serde(default = "default_prewarm")]
217 : pub prewarm: bool,
218 :
219 : /// For non-prewarm migrations which will immediately enter a cutover to the new node: how long to wait
220 : /// overall for secondary warmup before cutting over
221 : #[serde(default)]
222 : #[serde(with = "humantime_serde")]
223 : pub secondary_warmup_timeout: Option<Duration>,
224 : /// For non-prewarm migrations which will immediately enter a cutover to the new node: how long to wait
225 : /// within each secondary download poll call to pageserver.
226 : #[serde(default)]
227 : #[serde(with = "humantime_serde")]
228 : pub secondary_download_request_timeout: Option<Duration>,
229 : }
230 :
231 3 : fn default_prewarm() -> bool {
232 3 : true
233 3 : }
234 :
235 : impl Default for MigrationConfig {
236 2 : fn default() -> Self {
237 2 : Self {
238 2 : override_scheduler: false,
239 2 : prewarm: default_prewarm(),
240 2 : secondary_warmup_timeout: None,
241 2 : secondary_download_request_timeout: None,
242 2 : }
243 2 : }
244 : }
245 :
246 : #[derive(Serialize, Clone, Debug)]
247 : #[serde(into = "NodeAvailabilityWrapper")]
248 : pub enum NodeAvailability {
249 : // Normal, happy state
250 : Active(PageserverUtilization),
251 : // Node is warming up, but we expect it to become available soon. Covers
252 : // the time span between the re-attach response being composed on the storage controller
253 : // and the first successful heartbeat after the processing of the re-attach response
254 : // finishes on the pageserver.
255 : WarmingUp(Instant),
256 : // Offline: Tenants shouldn't try to attach here, but they may assume that their
257 : // secondary locations on this node still exist. Newly added nodes are in this
258 : // state until we successfully contact them.
259 : Offline,
260 : }
261 :
262 : impl PartialEq for NodeAvailability {
263 0 : fn eq(&self, other: &Self) -> bool {
264 : use NodeAvailability::*;
265 0 : matches!(
266 0 : (self, other),
267 : (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
268 : )
269 0 : }
270 : }
271 :
272 : impl Eq for NodeAvailability {}
273 :
274 : // This wrapper provides serde functionality and it should only be used to
275 : // communicate with external callers which don't know or care about the
276 : // utilisation score of the pageserver it is targeting.
277 0 : #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
278 : pub enum NodeAvailabilityWrapper {
279 : Active,
280 : WarmingUp,
281 : Offline,
282 : }
283 :
284 : impl From<NodeAvailabilityWrapper> for NodeAvailability {
285 0 : fn from(val: NodeAvailabilityWrapper) -> Self {
286 0 : match val {
287 : // Assume the worst utilisation score to begin with. It will later be updated by
288 : // the heartbeats.
289 : NodeAvailabilityWrapper::Active => {
290 0 : NodeAvailability::Active(PageserverUtilization::full())
291 : }
292 0 : NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
293 0 : NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
294 : }
295 0 : }
296 : }
297 :
298 : impl From<NodeAvailability> for NodeAvailabilityWrapper {
299 0 : fn from(val: NodeAvailability) -> Self {
300 0 : match val {
301 0 : NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
302 0 : NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
303 0 : NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
304 : }
305 0 : }
306 : }
307 :
308 : /// Scheduling policy enables us to selectively disable some automatic actions that the
309 : /// controller performs on a tenant shard. This is only set to a non-default value by
310 : /// human intervention, and it is reset to the default value (Active) when the tenant's
311 : /// placement policy is modified away from Attached.
312 : ///
313 : /// The typical use of a non-Active scheduling policy is one of:
314 : /// - Pinnning a shard to a node (i.e. migrating it there & setting a non-Active scheduling policy)
315 : /// - Working around a bug (e.g. if something is flapping and we need to stop it until the bug is fixed)
316 : ///
317 : /// If you're not sure which policy to use to pin a shard to its current location, you probably
318 : /// want Pause.
319 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
320 : pub enum ShardSchedulingPolicy {
321 : // Normal mode: the tenant's scheduled locations may be updated at will, including
322 : // for non-essential optimization.
323 : Active,
324 :
325 : // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
326 : // For example, this still permits a node's attachment location to change to a secondary in
327 : // response to a node failure, or to assign a new secondary if a node was removed.
328 : Essential,
329 :
330 : // No scheduling: leave the shard running wherever it currently is. Even if the shard is
331 : // unavailable, it will not be rescheduled to another node.
332 : Pause,
333 :
334 : // No reconciling: we will make no location_conf API calls to pageservers at all. If the
335 : // shard is unavailable, it stays that way. If a node fails, this shard doesn't get failed over.
336 : Stop,
337 : }
338 :
339 : impl Default for ShardSchedulingPolicy {
340 12843 : fn default() -> Self {
341 12843 : Self::Active
342 12843 : }
343 : }
344 :
345 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
346 : pub enum NodeSchedulingPolicy {
347 : Active,
348 : Filling,
349 : Pause,
350 : PauseForRestart,
351 : Draining,
352 : }
353 :
354 : impl FromStr for NodeSchedulingPolicy {
355 : type Err = anyhow::Error;
356 :
357 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
358 0 : match s {
359 0 : "active" => Ok(Self::Active),
360 0 : "filling" => Ok(Self::Filling),
361 0 : "pause" => Ok(Self::Pause),
362 0 : "pause_for_restart" => Ok(Self::PauseForRestart),
363 0 : "draining" => Ok(Self::Draining),
364 0 : _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
365 : }
366 0 : }
367 : }
368 :
369 : impl From<NodeSchedulingPolicy> for String {
370 0 : fn from(value: NodeSchedulingPolicy) -> String {
371 : use NodeSchedulingPolicy::*;
372 0 : match value {
373 0 : Active => "active",
374 0 : Filling => "filling",
375 0 : Pause => "pause",
376 0 : PauseForRestart => "pause_for_restart",
377 0 : Draining => "draining",
378 : }
379 0 : .to_string()
380 0 : }
381 : }
382 :
383 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
384 : pub enum SkSchedulingPolicy {
385 : Active,
386 : Pause,
387 : Decomissioned,
388 : }
389 :
390 : impl FromStr for SkSchedulingPolicy {
391 : type Err = anyhow::Error;
392 :
393 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
394 0 : Ok(match s {
395 0 : "active" => Self::Active,
396 0 : "pause" => Self::Pause,
397 0 : "decomissioned" => Self::Decomissioned,
398 : _ => {
399 0 : return Err(anyhow::anyhow!(
400 0 : "Unknown scheduling policy '{s}', try active,pause,decomissioned"
401 0 : ));
402 : }
403 : })
404 0 : }
405 : }
406 :
407 : impl From<SkSchedulingPolicy> for String {
408 0 : fn from(value: SkSchedulingPolicy) -> String {
409 : use SkSchedulingPolicy::*;
410 0 : match value {
411 0 : Active => "active",
412 0 : Pause => "pause",
413 0 : Decomissioned => "decomissioned",
414 : }
415 0 : .to_string()
416 0 : }
417 : }
418 :
419 : /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
420 : /// to create secondary locations.
421 2 : #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
422 : pub enum PlacementPolicy {
423 : /// Normal live state: one attached pageserver and zero or more secondaries.
424 : Attached(usize),
425 : /// Create one secondary mode locations. This is useful when onboarding
426 : /// a tenant, or for an idle tenant that we might want to bring online quickly.
427 : Secondary,
428 :
429 : /// Do not attach to any pageservers. This is appropriate for tenants that
430 : /// have been idle for a long time, where we do not mind some delay in making
431 : /// them available in future.
432 : Detached,
433 : }
434 :
435 : impl PlacementPolicy {
436 55 : pub fn want_secondaries(&self) -> usize {
437 55 : match self {
438 52 : PlacementPolicy::Attached(secondary_count) => *secondary_count,
439 3 : PlacementPolicy::Secondary => 1,
440 0 : PlacementPolicy::Detached => 0,
441 : }
442 55 : }
443 : }
444 :
445 0 : #[derive(Serialize, Deserialize, Debug)]
446 : pub struct TenantShardMigrateResponse {}
447 :
448 : /// Metadata health record posted from scrubber.
449 0 : #[derive(Serialize, Deserialize, Debug)]
450 : pub struct MetadataHealthRecord {
451 : pub tenant_shard_id: TenantShardId,
452 : pub healthy: bool,
453 : pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
454 : }
455 :
456 0 : #[derive(Serialize, Deserialize, Debug)]
457 : pub struct MetadataHealthUpdateRequest {
458 : pub healthy_tenant_shards: HashSet<TenantShardId>,
459 : pub unhealthy_tenant_shards: HashSet<TenantShardId>,
460 : }
461 :
462 0 : #[derive(Serialize, Deserialize, Debug)]
463 : pub struct MetadataHealthUpdateResponse {}
464 :
465 0 : #[derive(Serialize, Deserialize, Debug)]
466 : pub struct MetadataHealthListUnhealthyResponse {
467 : pub unhealthy_tenant_shards: Vec<TenantShardId>,
468 : }
469 :
470 0 : #[derive(Serialize, Deserialize, Debug)]
471 : pub struct MetadataHealthListOutdatedRequest {
472 : #[serde(with = "humantime_serde")]
473 : pub not_scrubbed_for: Duration,
474 : }
475 :
476 0 : #[derive(Serialize, Deserialize, Debug)]
477 : pub struct MetadataHealthListOutdatedResponse {
478 : pub health_records: Vec<MetadataHealthRecord>,
479 : }
480 :
481 : /// Publicly exposed safekeeper description
482 0 : #[derive(Serialize, Deserialize, Clone)]
483 : pub struct SafekeeperDescribeResponse {
484 : pub id: NodeId,
485 : pub region_id: String,
486 : /// 1 is special, it means just created (not currently posted to storcon).
487 : /// Zero or negative is not really expected.
488 : /// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
489 : pub version: i64,
490 : pub host: String,
491 : pub port: i32,
492 : pub http_port: i32,
493 : pub https_port: Option<i32>,
494 : pub availability_zone_id: String,
495 : pub scheduling_policy: SkSchedulingPolicy,
496 : }
497 :
498 0 : #[derive(Serialize, Deserialize, Clone)]
499 : pub struct SafekeeperSchedulingPolicyRequest {
500 : pub scheduling_policy: SkSchedulingPolicy,
501 : }
502 :
503 : /// Import request for safekeeper timelines.
504 0 : #[derive(Serialize, Deserialize, Clone)]
505 : pub struct TimelineImportRequest {
506 : pub tenant_id: TenantId,
507 : pub timeline_id: TimelineId,
508 : pub start_lsn: Lsn,
509 : pub sk_set: Vec<NodeId>,
510 : }
511 :
512 : #[cfg(test)]
513 : mod test {
514 : use serde_json;
515 :
516 : use super::*;
517 :
518 : /// Check stability of PlacementPolicy's serialization
519 : #[test]
520 1 : fn placement_policy_encoding() -> anyhow::Result<()> {
521 1 : let v = PlacementPolicy::Attached(1);
522 1 : let encoded = serde_json::to_string(&v)?;
523 1 : assert_eq!(encoded, "{\"Attached\":1}");
524 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
525 :
526 1 : let v = PlacementPolicy::Detached;
527 1 : let encoded = serde_json::to_string(&v)?;
528 1 : assert_eq!(encoded, "\"Detached\"");
529 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
530 1 : Ok(())
531 1 : }
532 :
533 : #[test]
534 1 : fn test_reject_unknown_field() {
535 1 : let id = TenantId::generate();
536 1 : let create_request = serde_json::json!({
537 1 : "new_tenant_id": id.to_string(),
538 1 : "unknown_field": "unknown_value".to_string(),
539 1 : });
540 1 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
541 1 : assert!(
542 1 : err.to_string().contains("unknown field `unknown_field`"),
543 0 : "expect unknown field `unknown_field` error, got: {}",
544 : err
545 : );
546 1 : }
547 :
548 : /// Check that a minimal migrate request with no config results in the expected default settings
549 : #[test]
550 1 : fn test_migrate_request_decode_defaults() {
551 1 : let json = r#"{
552 1 : "node_id": 123
553 1 : }"#;
554 1 :
555 1 : let request: TenantShardMigrateRequest = serde_json::from_str(json).unwrap();
556 1 : assert_eq!(request.node_id, NodeId(123));
557 1 : assert_eq!(request.origin_node_id, None);
558 1 : assert!(!request.migration_config.override_scheduler);
559 1 : assert!(request.migration_config.prewarm);
560 1 : assert_eq!(request.migration_config.secondary_warmup_timeout, None);
561 1 : assert_eq!(
562 1 : request.migration_config.secondary_download_request_timeout,
563 1 : None
564 1 : );
565 1 : }
566 :
567 : /// Check that a partially specified migration config results in the expected default settings
568 : #[test]
569 1 : fn test_migration_config_decode_defaults() {
570 1 : // Specify just one field of the config
571 1 : let json = r#"{
572 1 : }"#;
573 1 :
574 1 : let config: MigrationConfig = serde_json::from_str(json).unwrap();
575 1 :
576 1 : // Check each field's expected default value
577 1 : assert!(!config.override_scheduler);
578 1 : assert!(config.prewarm);
579 1 : assert_eq!(config.secondary_warmup_timeout, None);
580 1 : assert_eq!(config.secondary_download_request_timeout, None);
581 1 : assert_eq!(config.secondary_warmup_timeout, None);
582 :
583 : // Consistency check that the Default impl agrees with our serde defaults
584 1 : assert_eq!(MigrationConfig::default(), config);
585 1 : }
586 : }
|