LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - controller_api.rs (source / functions) Coverage Total Hit
Test: 45c9170b95180e9ecfad9a53e031030abf2a178c.info Lines: 29.3 % 116 34
Test Date: 2025-02-21 15:51:08 Functions: 2.2 % 314 7

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : use std::fmt::Display;
       3              : use std::str::FromStr;
       4              : use std::time::{Duration, Instant};
       5              : 
       6              : /// Request/response types for the storage controller
       7              : /// API (`/control/v1` prefix).  Implemented by the server
       8              : /// in [`storage_controller::http`]
       9              : use serde::{Deserialize, Serialize};
      10              : use utils::id::{NodeId, TenantId};
      11              : 
      12              : use crate::models::PageserverUtilization;
      13              : use crate::{
      14              :     models::{ShardParameters, TenantConfig},
      15              :     shard::{ShardStripeSize, TenantShardId},
      16              : };
      17              : 
      18            2 : #[derive(Serialize, Deserialize, Debug)]
      19              : #[serde(deny_unknown_fields)]
      20              : pub struct TenantCreateRequest {
      21              :     pub new_tenant_id: TenantShardId,
      22              :     #[serde(default)]
      23              :     #[serde(skip_serializing_if = "Option::is_none")]
      24              :     pub generation: Option<u32>,
      25              : 
      26              :     // If omitted, create a single shard with TenantShardId::unsharded()
      27              :     #[serde(default)]
      28              :     #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
      29              :     pub shard_parameters: ShardParameters,
      30              : 
      31              :     #[serde(default)]
      32              :     #[serde(skip_serializing_if = "Option::is_none")]
      33              :     pub placement_policy: Option<PlacementPolicy>,
      34              : 
      35              :     #[serde(flatten)]
      36              :     pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
      37              : }
      38              : 
      39            0 : #[derive(Serialize, Deserialize)]
      40              : pub struct TenantCreateResponseShard {
      41              :     pub shard_id: TenantShardId,
      42              :     pub node_id: NodeId,
      43              :     pub generation: u32,
      44              : }
      45              : 
      46            0 : #[derive(Serialize, Deserialize)]
      47              : pub struct TenantCreateResponse {
      48              :     pub shards: Vec<TenantCreateResponseShard>,
      49              : }
      50              : 
      51            0 : #[derive(Serialize, Deserialize, Debug, Clone)]
      52              : pub struct NodeRegisterRequest {
      53              :     pub node_id: NodeId,
      54              : 
      55              :     pub listen_pg_addr: String,
      56              :     pub listen_pg_port: u16,
      57              : 
      58              :     pub listen_http_addr: String,
      59              :     pub listen_http_port: u16,
      60              :     pub listen_https_port: Option<u16>,
      61              : 
      62              :     pub availability_zone_id: AvailabilityZone,
      63              : }
      64              : 
      65            0 : #[derive(Serialize, Deserialize)]
      66              : pub struct NodeConfigureRequest {
      67              :     pub node_id: NodeId,
      68              : 
      69              :     pub availability: Option<NodeAvailabilityWrapper>,
      70              :     pub scheduling: Option<NodeSchedulingPolicy>,
      71              : }
      72              : 
      73            0 : #[derive(Serialize, Deserialize)]
      74              : pub struct TenantPolicyRequest {
      75              :     pub placement: Option<PlacementPolicy>,
      76              :     pub scheduling: Option<ShardSchedulingPolicy>,
      77              : }
      78              : 
      79            0 : #[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, PartialOrd, Ord)]
      80              : pub struct AvailabilityZone(pub String);
      81              : 
      82              : impl Display for AvailabilityZone {
      83          300 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      84          300 :         write!(f, "{}", self.0)
      85          300 :     }
      86              : }
      87              : 
      88            0 : #[derive(Serialize, Deserialize)]
      89              : pub struct ShardsPreferredAzsRequest {
      90              :     #[serde(flatten)]
      91              :     pub preferred_az_ids: HashMap<TenantShardId, Option<AvailabilityZone>>,
      92              : }
      93              : 
      94            0 : #[derive(Serialize, Deserialize)]
      95              : pub struct ShardsPreferredAzsResponse {
      96              :     pub updated: Vec<TenantShardId>,
      97              : }
      98              : 
      99            0 : #[derive(Serialize, Deserialize, Debug)]
     100              : pub struct TenantLocateResponseShard {
     101              :     pub shard_id: TenantShardId,
     102              :     pub node_id: NodeId,
     103              : 
     104              :     pub listen_pg_addr: String,
     105              :     pub listen_pg_port: u16,
     106              : 
     107              :     pub listen_http_addr: String,
     108              :     pub listen_http_port: u16,
     109              :     pub listen_https_port: Option<u16>,
     110              : }
     111              : 
     112            0 : #[derive(Serialize, Deserialize)]
     113              : pub struct TenantLocateResponse {
     114              :     pub shards: Vec<TenantLocateResponseShard>,
     115              :     pub shard_params: ShardParameters,
     116              : }
     117              : 
     118            0 : #[derive(Serialize, Deserialize, Debug)]
     119              : pub struct TenantDescribeResponse {
     120              :     pub tenant_id: TenantId,
     121              :     pub shards: Vec<TenantDescribeResponseShard>,
     122              :     pub stripe_size: ShardStripeSize,
     123              :     pub policy: PlacementPolicy,
     124              :     pub config: TenantConfig,
     125              : }
     126              : 
     127            0 : #[derive(Serialize, Deserialize, Debug)]
     128              : pub struct NodeShardResponse {
     129              :     pub node_id: NodeId,
     130              :     pub shards: Vec<NodeShard>,
     131              : }
     132              : 
     133            0 : #[derive(Serialize, Deserialize, Debug)]
     134              : pub struct NodeShard {
     135              :     pub tenant_shard_id: TenantShardId,
     136              :     /// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
     137              :     pub is_observed_secondary: Option<bool>,
     138              :     /// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
     139              :     pub is_intended_secondary: Option<bool>,
     140              : }
     141              : 
     142            0 : #[derive(Serialize, Deserialize)]
     143              : pub struct NodeDescribeResponse {
     144              :     pub id: NodeId,
     145              : 
     146              :     pub availability: NodeAvailabilityWrapper,
     147              :     pub scheduling: NodeSchedulingPolicy,
     148              : 
     149              :     pub availability_zone_id: String,
     150              : 
     151              :     pub listen_http_addr: String,
     152              :     pub listen_http_port: u16,
     153              :     pub listen_https_port: Option<u16>,
     154              : 
     155              :     pub listen_pg_addr: String,
     156              :     pub listen_pg_port: u16,
     157              : }
     158              : 
     159            0 : #[derive(Serialize, Deserialize, Debug)]
     160              : pub struct TenantDescribeResponseShard {
     161              :     pub tenant_shard_id: TenantShardId,
     162              : 
     163              :     pub node_attached: Option<NodeId>,
     164              :     pub node_secondary: Vec<NodeId>,
     165              : 
     166              :     pub last_error: String,
     167              : 
     168              :     /// A task is currently running to reconcile this tenant's intent state with the state on pageservers
     169              :     pub is_reconciling: bool,
     170              :     /// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
     171              :     pub is_pending_compute_notification: bool,
     172              :     /// A shard split is currently underway
     173              :     pub is_splitting: bool,
     174              : 
     175              :     pub scheduling_policy: ShardSchedulingPolicy,
     176              : 
     177              :     pub preferred_az_id: Option<String>,
     178              : }
     179              : 
     180              : /// Migration request for a given tenant shard to a given node.
     181              : ///
     182              : /// Explicitly migrating a particular shard is a low level operation
     183              : /// TODO: higher level "Reschedule tenant" operation where the request
     184              : /// specifies some constraints, e.g. asking it to get off particular node(s)
     185            0 : #[derive(Serialize, Deserialize, Debug)]
     186              : pub struct TenantShardMigrateRequest {
     187              :     pub node_id: NodeId,
     188              :     #[serde(default)]
     189              :     pub migration_config: Option<MigrationConfig>,
     190              : }
     191              : 
     192            0 : #[derive(Serialize, Deserialize, Debug)]
     193              : pub struct MigrationConfig {
     194              :     #[serde(default)]
     195              :     #[serde(with = "humantime_serde")]
     196              :     pub secondary_warmup_timeout: Option<Duration>,
     197              :     #[serde(default)]
     198              :     #[serde(with = "humantime_serde")]
     199              :     pub secondary_download_request_timeout: Option<Duration>,
     200              : }
     201              : 
     202              : #[derive(Serialize, Clone, Debug)]
     203              : #[serde(into = "NodeAvailabilityWrapper")]
     204              : pub enum NodeAvailability {
     205              :     // Normal, happy state
     206              :     Active(PageserverUtilization),
     207              :     // Node is warming up, but we expect it to become available soon. Covers
     208              :     // the time span between the re-attach response being composed on the storage controller
     209              :     // and the first successful heartbeat after the processing of the re-attach response
     210              :     // finishes on the pageserver.
     211              :     WarmingUp(Instant),
     212              :     // Offline: Tenants shouldn't try to attach here, but they may assume that their
     213              :     // secondary locations on this node still exist.  Newly added nodes are in this
     214              :     // state until we successfully contact them.
     215              :     Offline,
     216              : }
     217              : 
     218              : impl PartialEq for NodeAvailability {
     219            0 :     fn eq(&self, other: &Self) -> bool {
     220              :         use NodeAvailability::*;
     221            0 :         matches!(
     222            0 :             (self, other),
     223              :             (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
     224              :         )
     225            0 :     }
     226              : }
     227              : 
     228              : impl Eq for NodeAvailability {}
     229              : 
     230              : // This wrapper provides serde functionality and it should only be used to
     231              : // communicate with external callers which don't know or care about the
     232              : // utilisation score of the pageserver it is targeting.
     233            0 : #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
     234              : pub enum NodeAvailabilityWrapper {
     235              :     Active,
     236              :     WarmingUp,
     237              :     Offline,
     238              : }
     239              : 
     240              : impl From<NodeAvailabilityWrapper> for NodeAvailability {
     241            0 :     fn from(val: NodeAvailabilityWrapper) -> Self {
     242            0 :         match val {
     243              :             // Assume the worst utilisation score to begin with. It will later be updated by
     244              :             // the heartbeats.
     245              :             NodeAvailabilityWrapper::Active => {
     246            0 :                 NodeAvailability::Active(PageserverUtilization::full())
     247              :             }
     248            0 :             NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
     249            0 :             NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
     250              :         }
     251            0 :     }
     252              : }
     253              : 
     254              : impl From<NodeAvailability> for NodeAvailabilityWrapper {
     255            0 :     fn from(val: NodeAvailability) -> Self {
     256            0 :         match val {
     257            0 :             NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
     258            0 :             NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
     259            0 :             NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
     260              :         }
     261            0 :     }
     262              : }
     263              : 
     264              : /// Scheduling policy enables us to selectively disable some automatic actions that the
     265              : /// controller performs on a tenant shard. This is only set to a non-default value by
     266              : /// human intervention, and it is reset to the default value (Active) when the tenant's
     267              : /// placement policy is modified away from Attached.
     268              : ///
     269              : /// The typical use of a non-Active scheduling policy is one of:
     270              : /// - Pinnning a shard to a node (i.e. migrating it there & setting a non-Active scheduling policy)
     271              : /// - Working around a bug (e.g. if something is flapping and we need to stop it until the bug is fixed)
     272              : ///
     273              : /// If you're not sure which policy to use to pin a shard to its current location, you probably
     274              : /// want Pause.
     275            0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
     276              : pub enum ShardSchedulingPolicy {
     277              :     // Normal mode: the tenant's scheduled locations may be updated at will, including
     278              :     // for non-essential optimization.
     279              :     Active,
     280              : 
     281              :     // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
     282              :     // For example, this still permits a node's attachment location to change to a secondary in
     283              :     // response to a node failure, or to assign a new secondary if a node was removed.
     284              :     Essential,
     285              : 
     286              :     // No scheduling: leave the shard running wherever it currently is.  Even if the shard is
     287              :     // unavailable, it will not be rescheduled to another node.
     288              :     Pause,
     289              : 
     290              :     // No reconciling: we will make no location_conf API calls to pageservers at all.  If the
     291              :     // shard is unavailable, it stays that way.  If a node fails, this shard doesn't get failed over.
     292              :     Stop,
     293              : }
     294              : 
     295              : impl Default for ShardSchedulingPolicy {
     296        12841 :     fn default() -> Self {
     297        12841 :         Self::Active
     298        12841 :     }
     299              : }
     300              : 
     301            0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
     302              : pub enum NodeSchedulingPolicy {
     303              :     Active,
     304              :     Filling,
     305              :     Pause,
     306              :     PauseForRestart,
     307              :     Draining,
     308              : }
     309              : 
     310              : impl FromStr for NodeSchedulingPolicy {
     311              :     type Err = anyhow::Error;
     312              : 
     313            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     314            0 :         match s {
     315            0 :             "active" => Ok(Self::Active),
     316            0 :             "filling" => Ok(Self::Filling),
     317            0 :             "pause" => Ok(Self::Pause),
     318            0 :             "pause_for_restart" => Ok(Self::PauseForRestart),
     319            0 :             "draining" => Ok(Self::Draining),
     320            0 :             _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
     321              :         }
     322            0 :     }
     323              : }
     324              : 
     325              : impl From<NodeSchedulingPolicy> for String {
     326            0 :     fn from(value: NodeSchedulingPolicy) -> String {
     327              :         use NodeSchedulingPolicy::*;
     328            0 :         match value {
     329            0 :             Active => "active",
     330            0 :             Filling => "filling",
     331            0 :             Pause => "pause",
     332            0 :             PauseForRestart => "pause_for_restart",
     333            0 :             Draining => "draining",
     334              :         }
     335            0 :         .to_string()
     336            0 :     }
     337              : }
     338              : 
     339            0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
     340              : pub enum SkSchedulingPolicy {
     341              :     Active,
     342              :     Pause,
     343              :     Decomissioned,
     344              : }
     345              : 
     346              : impl FromStr for SkSchedulingPolicy {
     347              :     type Err = anyhow::Error;
     348              : 
     349            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     350            0 :         Ok(match s {
     351            0 :             "active" => Self::Active,
     352            0 :             "pause" => Self::Pause,
     353            0 :             "decomissioned" => Self::Decomissioned,
     354              :             _ => {
     355            0 :                 return Err(anyhow::anyhow!(
     356            0 :                     "Unknown scheduling policy '{s}', try active,pause,decomissioned"
     357            0 :                 ))
     358              :             }
     359              :         })
     360            0 :     }
     361              : }
     362              : 
     363              : impl From<SkSchedulingPolicy> for String {
     364            0 :     fn from(value: SkSchedulingPolicy) -> String {
     365              :         use SkSchedulingPolicy::*;
     366            0 :         match value {
     367            0 :             Active => "active",
     368            0 :             Pause => "pause",
     369            0 :             Decomissioned => "decomissioned",
     370              :         }
     371            0 :         .to_string()
     372            0 :     }
     373              : }
     374              : 
     375              : /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
     376              : /// to create secondary locations.
     377            2 : #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
     378              : pub enum PlacementPolicy {
     379              :     /// Normal live state: one attached pageserver and zero or more secondaries.
     380              :     Attached(usize),
     381              :     /// Create one secondary mode locations. This is useful when onboarding
     382              :     /// a tenant, or for an idle tenant that we might want to bring online quickly.
     383              :     Secondary,
     384              : 
     385              :     /// Do not attach to any pageservers.  This is appropriate for tenants that
     386              :     /// have been idle for a long time, where we do not mind some delay in making
     387              :     /// them available in future.
     388              :     Detached,
     389              : }
     390              : 
     391              : impl PlacementPolicy {
     392           53 :     pub fn want_secondaries(&self) -> usize {
     393           53 :         match self {
     394           50 :             PlacementPolicy::Attached(secondary_count) => *secondary_count,
     395            3 :             PlacementPolicy::Secondary => 1,
     396            0 :             PlacementPolicy::Detached => 0,
     397              :         }
     398           53 :     }
     399              : }
     400              : 
     401            0 : #[derive(Serialize, Deserialize, Debug)]
     402              : pub struct TenantShardMigrateResponse {}
     403              : 
     404              : /// Metadata health record posted from scrubber.
     405            0 : #[derive(Serialize, Deserialize, Debug)]
     406              : pub struct MetadataHealthRecord {
     407              :     pub tenant_shard_id: TenantShardId,
     408              :     pub healthy: bool,
     409              :     pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
     410              : }
     411              : 
     412            0 : #[derive(Serialize, Deserialize, Debug)]
     413              : pub struct MetadataHealthUpdateRequest {
     414              :     pub healthy_tenant_shards: HashSet<TenantShardId>,
     415              :     pub unhealthy_tenant_shards: HashSet<TenantShardId>,
     416              : }
     417              : 
     418            0 : #[derive(Serialize, Deserialize, Debug)]
     419              : pub struct MetadataHealthUpdateResponse {}
     420              : 
     421            0 : #[derive(Serialize, Deserialize, Debug)]
     422              : pub struct MetadataHealthListUnhealthyResponse {
     423              :     pub unhealthy_tenant_shards: Vec<TenantShardId>,
     424              : }
     425              : 
     426            0 : #[derive(Serialize, Deserialize, Debug)]
     427              : pub struct MetadataHealthListOutdatedRequest {
     428              :     #[serde(with = "humantime_serde")]
     429              :     pub not_scrubbed_for: Duration,
     430              : }
     431              : 
     432            0 : #[derive(Serialize, Deserialize, Debug)]
     433              : pub struct MetadataHealthListOutdatedResponse {
     434              :     pub health_records: Vec<MetadataHealthRecord>,
     435              : }
     436              : 
     437              : /// Publicly exposed safekeeper description
     438            0 : #[derive(Serialize, Deserialize, Clone)]
     439              : pub struct SafekeeperDescribeResponse {
     440              :     pub id: NodeId,
     441              :     pub region_id: String,
     442              :     /// 1 is special, it means just created (not currently posted to storcon).
     443              :     /// Zero or negative is not really expected.
     444              :     /// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
     445              :     pub version: i64,
     446              :     pub host: String,
     447              :     pub port: i32,
     448              :     pub http_port: i32,
     449              :     pub availability_zone_id: String,
     450              :     pub scheduling_policy: SkSchedulingPolicy,
     451              : }
     452              : 
     453            0 : #[derive(Serialize, Deserialize, Clone)]
     454              : pub struct SafekeeperSchedulingPolicyRequest {
     455              :     pub scheduling_policy: SkSchedulingPolicy,
     456              : }
     457              : 
     458              : #[cfg(test)]
     459              : mod test {
     460              :     use super::*;
     461              :     use serde_json;
     462              : 
     463              :     /// Check stability of PlacementPolicy's serialization
     464              :     #[test]
     465            1 :     fn placement_policy_encoding() -> anyhow::Result<()> {
     466            1 :         let v = PlacementPolicy::Attached(1);
     467            1 :         let encoded = serde_json::to_string(&v)?;
     468            1 :         assert_eq!(encoded, "{\"Attached\":1}");
     469            1 :         assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
     470              : 
     471            1 :         let v = PlacementPolicy::Detached;
     472            1 :         let encoded = serde_json::to_string(&v)?;
     473            1 :         assert_eq!(encoded, "\"Detached\"");
     474            1 :         assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
     475            1 :         Ok(())
     476            1 :     }
     477              : 
     478              :     #[test]
     479            1 :     fn test_reject_unknown_field() {
     480            1 :         let id = TenantId::generate();
     481            1 :         let create_request = serde_json::json!({
     482            1 :             "new_tenant_id": id.to_string(),
     483            1 :             "unknown_field": "unknown_value".to_string(),
     484            1 :         });
     485            1 :         let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
     486            1 :         assert!(
     487            1 :             err.to_string().contains("unknown field `unknown_field`"),
     488            0 :             "expect unknown field `unknown_field` error, got: {}",
     489              :             err
     490              :         );
     491            1 :     }
     492              : }
        

Generated by: LCOV version 2.1-beta