Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::fmt::Display;
3 : use std::str::FromStr;
4 : use std::time::{Duration, Instant};
5 :
6 : /// Request/response types for the storage controller
7 : /// API (`/control/v1` prefix). Implemented by the server
8 : /// in [`storage_controller::http`]
9 : use serde::{Deserialize, Serialize};
10 : use utils::id::{NodeId, TenantId};
11 :
12 : use crate::models::PageserverUtilization;
13 : use crate::{
14 : models::{ShardParameters, TenantConfig},
15 : shard::{ShardStripeSize, TenantShardId},
16 : };
17 :
18 2 : #[derive(Serialize, Deserialize, Debug)]
19 : #[serde(deny_unknown_fields)]
20 : pub struct TenantCreateRequest {
21 : pub new_tenant_id: TenantShardId,
22 : #[serde(default)]
23 : #[serde(skip_serializing_if = "Option::is_none")]
24 : pub generation: Option<u32>,
25 :
26 : // If omitted, create a single shard with TenantShardId::unsharded()
27 : #[serde(default)]
28 : #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
29 : pub shard_parameters: ShardParameters,
30 :
31 : #[serde(default)]
32 : #[serde(skip_serializing_if = "Option::is_none")]
33 : pub placement_policy: Option<PlacementPolicy>,
34 :
35 : #[serde(flatten)]
36 : pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
37 : }
38 :
39 0 : #[derive(Serialize, Deserialize)]
40 : pub struct TenantCreateResponseShard {
41 : pub shard_id: TenantShardId,
42 : pub node_id: NodeId,
43 : pub generation: u32,
44 : }
45 :
46 0 : #[derive(Serialize, Deserialize)]
47 : pub struct TenantCreateResponse {
48 : pub shards: Vec<TenantCreateResponseShard>,
49 : }
50 :
51 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
52 : pub struct NodeRegisterRequest {
53 : pub node_id: NodeId,
54 :
55 : pub listen_pg_addr: String,
56 : pub listen_pg_port: u16,
57 :
58 : pub listen_http_addr: String,
59 : pub listen_http_port: u16,
60 : pub listen_https_port: Option<u16>,
61 :
62 : pub availability_zone_id: AvailabilityZone,
63 : }
64 :
65 0 : #[derive(Serialize, Deserialize)]
66 : pub struct NodeConfigureRequest {
67 : pub node_id: NodeId,
68 :
69 : pub availability: Option<NodeAvailabilityWrapper>,
70 : pub scheduling: Option<NodeSchedulingPolicy>,
71 : }
72 :
73 0 : #[derive(Serialize, Deserialize)]
74 : pub struct TenantPolicyRequest {
75 : pub placement: Option<PlacementPolicy>,
76 : pub scheduling: Option<ShardSchedulingPolicy>,
77 : }
78 :
79 0 : #[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, PartialOrd, Ord)]
80 : pub struct AvailabilityZone(pub String);
81 :
82 : impl Display for AvailabilityZone {
83 300 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 300 : write!(f, "{}", self.0)
85 300 : }
86 : }
87 :
88 0 : #[derive(Serialize, Deserialize)]
89 : pub struct ShardsPreferredAzsRequest {
90 : #[serde(flatten)]
91 : pub preferred_az_ids: HashMap<TenantShardId, Option<AvailabilityZone>>,
92 : }
93 :
94 0 : #[derive(Serialize, Deserialize)]
95 : pub struct ShardsPreferredAzsResponse {
96 : pub updated: Vec<TenantShardId>,
97 : }
98 :
99 0 : #[derive(Serialize, Deserialize, Debug)]
100 : pub struct TenantLocateResponseShard {
101 : pub shard_id: TenantShardId,
102 : pub node_id: NodeId,
103 :
104 : pub listen_pg_addr: String,
105 : pub listen_pg_port: u16,
106 :
107 : pub listen_http_addr: String,
108 : pub listen_http_port: u16,
109 : pub listen_https_port: Option<u16>,
110 : }
111 :
112 0 : #[derive(Serialize, Deserialize)]
113 : pub struct TenantLocateResponse {
114 : pub shards: Vec<TenantLocateResponseShard>,
115 : pub shard_params: ShardParameters,
116 : }
117 :
118 0 : #[derive(Serialize, Deserialize, Debug)]
119 : pub struct TenantDescribeResponse {
120 : pub tenant_id: TenantId,
121 : pub shards: Vec<TenantDescribeResponseShard>,
122 : pub stripe_size: ShardStripeSize,
123 : pub policy: PlacementPolicy,
124 : pub config: TenantConfig,
125 : }
126 :
127 0 : #[derive(Serialize, Deserialize, Debug)]
128 : pub struct NodeShardResponse {
129 : pub node_id: NodeId,
130 : pub shards: Vec<NodeShard>,
131 : }
132 :
133 0 : #[derive(Serialize, Deserialize, Debug)]
134 : pub struct NodeShard {
135 : pub tenant_shard_id: TenantShardId,
136 : /// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
137 : pub is_observed_secondary: Option<bool>,
138 : /// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
139 : pub is_intended_secondary: Option<bool>,
140 : }
141 :
142 0 : #[derive(Serialize, Deserialize)]
143 : pub struct NodeDescribeResponse {
144 : pub id: NodeId,
145 :
146 : pub availability: NodeAvailabilityWrapper,
147 : pub scheduling: NodeSchedulingPolicy,
148 :
149 : pub availability_zone_id: String,
150 :
151 : pub listen_http_addr: String,
152 : pub listen_http_port: u16,
153 : pub listen_https_port: Option<u16>,
154 :
155 : pub listen_pg_addr: String,
156 : pub listen_pg_port: u16,
157 : }
158 :
159 0 : #[derive(Serialize, Deserialize, Debug)]
160 : pub struct TenantDescribeResponseShard {
161 : pub tenant_shard_id: TenantShardId,
162 :
163 : pub node_attached: Option<NodeId>,
164 : pub node_secondary: Vec<NodeId>,
165 :
166 : pub last_error: String,
167 :
168 : /// A task is currently running to reconcile this tenant's intent state with the state on pageservers
169 : pub is_reconciling: bool,
170 : /// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
171 : pub is_pending_compute_notification: bool,
172 : /// A shard split is currently underway
173 : pub is_splitting: bool,
174 :
175 : pub scheduling_policy: ShardSchedulingPolicy,
176 :
177 : pub preferred_az_id: Option<String>,
178 : }
179 :
180 : /// Migration request for a given tenant shard to a given node.
181 : ///
182 : /// Explicitly migrating a particular shard is a low level operation
183 : /// TODO: higher level "Reschedule tenant" operation where the request
184 : /// specifies some constraints, e.g. asking it to get off particular node(s)
185 0 : #[derive(Serialize, Deserialize, Debug)]
186 : pub struct TenantShardMigrateRequest {
187 : pub node_id: NodeId,
188 : #[serde(default)]
189 : pub migration_config: Option<MigrationConfig>,
190 : }
191 :
192 0 : #[derive(Serialize, Deserialize, Debug)]
193 : pub struct MigrationConfig {
194 : #[serde(default)]
195 : #[serde(with = "humantime_serde")]
196 : pub secondary_warmup_timeout: Option<Duration>,
197 : #[serde(default)]
198 : #[serde(with = "humantime_serde")]
199 : pub secondary_download_request_timeout: Option<Duration>,
200 : }
201 :
202 : #[derive(Serialize, Clone, Debug)]
203 : #[serde(into = "NodeAvailabilityWrapper")]
204 : pub enum NodeAvailability {
205 : // Normal, happy state
206 : Active(PageserverUtilization),
207 : // Node is warming up, but we expect it to become available soon. Covers
208 : // the time span between the re-attach response being composed on the storage controller
209 : // and the first successful heartbeat after the processing of the re-attach response
210 : // finishes on the pageserver.
211 : WarmingUp(Instant),
212 : // Offline: Tenants shouldn't try to attach here, but they may assume that their
213 : // secondary locations on this node still exist. Newly added nodes are in this
214 : // state until we successfully contact them.
215 : Offline,
216 : }
217 :
218 : impl PartialEq for NodeAvailability {
219 0 : fn eq(&self, other: &Self) -> bool {
220 : use NodeAvailability::*;
221 0 : matches!(
222 0 : (self, other),
223 : (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
224 : )
225 0 : }
226 : }
227 :
228 : impl Eq for NodeAvailability {}
229 :
230 : // This wrapper provides serde functionality and it should only be used to
231 : // communicate with external callers which don't know or care about the
232 : // utilisation score of the pageserver it is targeting.
233 0 : #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
234 : pub enum NodeAvailabilityWrapper {
235 : Active,
236 : WarmingUp,
237 : Offline,
238 : }
239 :
240 : impl From<NodeAvailabilityWrapper> for NodeAvailability {
241 0 : fn from(val: NodeAvailabilityWrapper) -> Self {
242 0 : match val {
243 : // Assume the worst utilisation score to begin with. It will later be updated by
244 : // the heartbeats.
245 : NodeAvailabilityWrapper::Active => {
246 0 : NodeAvailability::Active(PageserverUtilization::full())
247 : }
248 0 : NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
249 0 : NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
250 : }
251 0 : }
252 : }
253 :
254 : impl From<NodeAvailability> for NodeAvailabilityWrapper {
255 0 : fn from(val: NodeAvailability) -> Self {
256 0 : match val {
257 0 : NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
258 0 : NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
259 0 : NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
260 : }
261 0 : }
262 : }
263 :
264 : /// Scheduling policy enables us to selectively disable some automatic actions that the
265 : /// controller performs on a tenant shard. This is only set to a non-default value by
266 : /// human intervention, and it is reset to the default value (Active) when the tenant's
267 : /// placement policy is modified away from Attached.
268 : ///
269 : /// The typical use of a non-Active scheduling policy is one of:
270 : /// - Pinnning a shard to a node (i.e. migrating it there & setting a non-Active scheduling policy)
271 : /// - Working around a bug (e.g. if something is flapping and we need to stop it until the bug is fixed)
272 : ///
273 : /// If you're not sure which policy to use to pin a shard to its current location, you probably
274 : /// want Pause.
275 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
276 : pub enum ShardSchedulingPolicy {
277 : // Normal mode: the tenant's scheduled locations may be updated at will, including
278 : // for non-essential optimization.
279 : Active,
280 :
281 : // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
282 : // For example, this still permits a node's attachment location to change to a secondary in
283 : // response to a node failure, or to assign a new secondary if a node was removed.
284 : Essential,
285 :
286 : // No scheduling: leave the shard running wherever it currently is. Even if the shard is
287 : // unavailable, it will not be rescheduled to another node.
288 : Pause,
289 :
290 : // No reconciling: we will make no location_conf API calls to pageservers at all. If the
291 : // shard is unavailable, it stays that way. If a node fails, this shard doesn't get failed over.
292 : Stop,
293 : }
294 :
295 : impl Default for ShardSchedulingPolicy {
296 12841 : fn default() -> Self {
297 12841 : Self::Active
298 12841 : }
299 : }
300 :
301 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
302 : pub enum NodeSchedulingPolicy {
303 : Active,
304 : Filling,
305 : Pause,
306 : PauseForRestart,
307 : Draining,
308 : }
309 :
310 : impl FromStr for NodeSchedulingPolicy {
311 : type Err = anyhow::Error;
312 :
313 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
314 0 : match s {
315 0 : "active" => Ok(Self::Active),
316 0 : "filling" => Ok(Self::Filling),
317 0 : "pause" => Ok(Self::Pause),
318 0 : "pause_for_restart" => Ok(Self::PauseForRestart),
319 0 : "draining" => Ok(Self::Draining),
320 0 : _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
321 : }
322 0 : }
323 : }
324 :
325 : impl From<NodeSchedulingPolicy> for String {
326 0 : fn from(value: NodeSchedulingPolicy) -> String {
327 : use NodeSchedulingPolicy::*;
328 0 : match value {
329 0 : Active => "active",
330 0 : Filling => "filling",
331 0 : Pause => "pause",
332 0 : PauseForRestart => "pause_for_restart",
333 0 : Draining => "draining",
334 : }
335 0 : .to_string()
336 0 : }
337 : }
338 :
339 0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
340 : pub enum SkSchedulingPolicy {
341 : Active,
342 : Pause,
343 : Decomissioned,
344 : }
345 :
346 : impl FromStr for SkSchedulingPolicy {
347 : type Err = anyhow::Error;
348 :
349 0 : fn from_str(s: &str) -> Result<Self, Self::Err> {
350 0 : Ok(match s {
351 0 : "active" => Self::Active,
352 0 : "pause" => Self::Pause,
353 0 : "decomissioned" => Self::Decomissioned,
354 : _ => {
355 0 : return Err(anyhow::anyhow!(
356 0 : "Unknown scheduling policy '{s}', try active,pause,decomissioned"
357 0 : ))
358 : }
359 : })
360 0 : }
361 : }
362 :
363 : impl From<SkSchedulingPolicy> for String {
364 0 : fn from(value: SkSchedulingPolicy) -> String {
365 : use SkSchedulingPolicy::*;
366 0 : match value {
367 0 : Active => "active",
368 0 : Pause => "pause",
369 0 : Decomissioned => "decomissioned",
370 : }
371 0 : .to_string()
372 0 : }
373 : }
374 :
375 : /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
376 : /// to create secondary locations.
377 2 : #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
378 : pub enum PlacementPolicy {
379 : /// Normal live state: one attached pageserver and zero or more secondaries.
380 : Attached(usize),
381 : /// Create one secondary mode locations. This is useful when onboarding
382 : /// a tenant, or for an idle tenant that we might want to bring online quickly.
383 : Secondary,
384 :
385 : /// Do not attach to any pageservers. This is appropriate for tenants that
386 : /// have been idle for a long time, where we do not mind some delay in making
387 : /// them available in future.
388 : Detached,
389 : }
390 :
391 : impl PlacementPolicy {
392 53 : pub fn want_secondaries(&self) -> usize {
393 53 : match self {
394 50 : PlacementPolicy::Attached(secondary_count) => *secondary_count,
395 3 : PlacementPolicy::Secondary => 1,
396 0 : PlacementPolicy::Detached => 0,
397 : }
398 53 : }
399 : }
400 :
401 0 : #[derive(Serialize, Deserialize, Debug)]
402 : pub struct TenantShardMigrateResponse {}
403 :
404 : /// Metadata health record posted from scrubber.
405 0 : #[derive(Serialize, Deserialize, Debug)]
406 : pub struct MetadataHealthRecord {
407 : pub tenant_shard_id: TenantShardId,
408 : pub healthy: bool,
409 : pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
410 : }
411 :
412 0 : #[derive(Serialize, Deserialize, Debug)]
413 : pub struct MetadataHealthUpdateRequest {
414 : pub healthy_tenant_shards: HashSet<TenantShardId>,
415 : pub unhealthy_tenant_shards: HashSet<TenantShardId>,
416 : }
417 :
418 0 : #[derive(Serialize, Deserialize, Debug)]
419 : pub struct MetadataHealthUpdateResponse {}
420 :
421 0 : #[derive(Serialize, Deserialize, Debug)]
422 : pub struct MetadataHealthListUnhealthyResponse {
423 : pub unhealthy_tenant_shards: Vec<TenantShardId>,
424 : }
425 :
426 0 : #[derive(Serialize, Deserialize, Debug)]
427 : pub struct MetadataHealthListOutdatedRequest {
428 : #[serde(with = "humantime_serde")]
429 : pub not_scrubbed_for: Duration,
430 : }
431 :
432 0 : #[derive(Serialize, Deserialize, Debug)]
433 : pub struct MetadataHealthListOutdatedResponse {
434 : pub health_records: Vec<MetadataHealthRecord>,
435 : }
436 :
437 : /// Publicly exposed safekeeper description
438 0 : #[derive(Serialize, Deserialize, Clone)]
439 : pub struct SafekeeperDescribeResponse {
440 : pub id: NodeId,
441 : pub region_id: String,
442 : /// 1 is special, it means just created (not currently posted to storcon).
443 : /// Zero or negative is not really expected.
444 : /// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
445 : pub version: i64,
446 : pub host: String,
447 : pub port: i32,
448 : pub http_port: i32,
449 : pub availability_zone_id: String,
450 : pub scheduling_policy: SkSchedulingPolicy,
451 : }
452 :
453 0 : #[derive(Serialize, Deserialize, Clone)]
454 : pub struct SafekeeperSchedulingPolicyRequest {
455 : pub scheduling_policy: SkSchedulingPolicy,
456 : }
457 :
458 : #[cfg(test)]
459 : mod test {
460 : use super::*;
461 : use serde_json;
462 :
463 : /// Check stability of PlacementPolicy's serialization
464 : #[test]
465 1 : fn placement_policy_encoding() -> anyhow::Result<()> {
466 1 : let v = PlacementPolicy::Attached(1);
467 1 : let encoded = serde_json::to_string(&v)?;
468 1 : assert_eq!(encoded, "{\"Attached\":1}");
469 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
470 :
471 1 : let v = PlacementPolicy::Detached;
472 1 : let encoded = serde_json::to_string(&v)?;
473 1 : assert_eq!(encoded, "\"Detached\"");
474 1 : assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
475 1 : Ok(())
476 1 : }
477 :
478 : #[test]
479 1 : fn test_reject_unknown_field() {
480 1 : let id = TenantId::generate();
481 1 : let create_request = serde_json::json!({
482 1 : "new_tenant_id": id.to_string(),
483 1 : "unknown_field": "unknown_value".to_string(),
484 1 : });
485 1 : let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
486 1 : assert!(
487 1 : err.to_string().contains("unknown field `unknown_field`"),
488 0 : "expect unknown field `unknown_field` error, got: {}",
489 : err
490 : );
491 1 : }
492 : }
|