LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - controller_api.rs (source / functions) Coverage Total Hit
Test: 7179b4db0d82ca8088cc95c44c4be4232078509c.info Lines: 28.9 % 90 26
Test Date: 2024-11-21 16:46:58 Functions: 2.7 % 373 10

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : use std::fmt::Display;
       3              : use std::str::FromStr;
       4              : use std::time::{Duration, Instant};
       5              : 
       6              : /// Request/response types for the storage controller
       7              : /// API (`/control/v1` prefix).  Implemented by the server
       8              : /// in [`storage_controller::http`]
       9              : use serde::{Deserialize, Serialize};
      10              : use utils::id::{NodeId, TenantId};
      11              : 
      12              : use crate::models::PageserverUtilization;
      13              : use crate::{
      14              :     models::{ShardParameters, TenantConfig},
      15              :     shard::{ShardStripeSize, TenantShardId},
      16              : };
      17              : 
      18            3 : #[derive(Serialize, Deserialize, Debug)]
      19              : #[serde(deny_unknown_fields)]
      20              : pub struct TenantCreateRequest {
      21              :     pub new_tenant_id: TenantShardId,
      22              :     #[serde(default)]
      23              :     #[serde(skip_serializing_if = "Option::is_none")]
      24              :     pub generation: Option<u32>,
      25              : 
      26              :     // If omitted, create a single shard with TenantShardId::unsharded()
      27              :     #[serde(default)]
      28              :     #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
      29              :     pub shard_parameters: ShardParameters,
      30              : 
      31              :     #[serde(default)]
      32              :     #[serde(skip_serializing_if = "Option::is_none")]
      33              :     pub placement_policy: Option<PlacementPolicy>,
      34              : 
      35              :     #[serde(flatten)]
      36              :     pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
      37              : }
      38              : 
      39            0 : #[derive(Serialize, Deserialize)]
      40              : pub struct TenantCreateResponseShard {
      41              :     pub shard_id: TenantShardId,
      42              :     pub node_id: NodeId,
      43              :     pub generation: u32,
      44              : }
      45              : 
      46            0 : #[derive(Serialize, Deserialize)]
      47              : pub struct TenantCreateResponse {
      48              :     pub shards: Vec<TenantCreateResponseShard>,
      49              : }
      50              : 
      51            0 : #[derive(Serialize, Deserialize)]
      52              : pub struct NodeRegisterRequest {
      53              :     pub node_id: NodeId,
      54              : 
      55              :     pub listen_pg_addr: String,
      56              :     pub listen_pg_port: u16,
      57              : 
      58              :     pub listen_http_addr: String,
      59              :     pub listen_http_port: u16,
      60              : 
      61              :     pub availability_zone_id: AvailabilityZone,
      62              : }
      63              : 
      64            0 : #[derive(Serialize, Deserialize)]
      65              : pub struct NodeConfigureRequest {
      66              :     pub node_id: NodeId,
      67              : 
      68              :     pub availability: Option<NodeAvailabilityWrapper>,
      69              :     pub scheduling: Option<NodeSchedulingPolicy>,
      70              : }
      71              : 
      72            0 : #[derive(Serialize, Deserialize)]
      73              : pub struct TenantPolicyRequest {
      74              :     pub placement: Option<PlacementPolicy>,
      75              :     pub scheduling: Option<ShardSchedulingPolicy>,
      76              : }
      77              : 
      78            0 : #[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
      79              : pub struct AvailabilityZone(pub String);
      80              : 
      81              : impl Display for AvailabilityZone {
      82            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      83            0 :         write!(f, "{}", self.0)
      84            0 :     }
      85              : }
      86              : 
      87            0 : #[derive(Serialize, Deserialize)]
      88              : pub struct ShardsPreferredAzsRequest {
      89              :     #[serde(flatten)]
      90              :     pub preferred_az_ids: HashMap<TenantShardId, AvailabilityZone>,
      91              : }
      92              : 
      93            0 : #[derive(Serialize, Deserialize)]
      94              : pub struct ShardsPreferredAzsResponse {
      95              :     pub updated: Vec<TenantShardId>,
      96              : }
      97              : 
      98            0 : #[derive(Serialize, Deserialize, Debug)]
      99              : pub struct TenantLocateResponseShard {
     100              :     pub shard_id: TenantShardId,
     101              :     pub node_id: NodeId,
     102              : 
     103              :     pub listen_pg_addr: String,
     104              :     pub listen_pg_port: u16,
     105              : 
     106              :     pub listen_http_addr: String,
     107              :     pub listen_http_port: u16,
     108              : }
     109              : 
     110            0 : #[derive(Serialize, Deserialize)]
     111              : pub struct TenantLocateResponse {
     112              :     pub shards: Vec<TenantLocateResponseShard>,
     113              :     pub shard_params: ShardParameters,
     114              : }
     115              : 
     116            0 : #[derive(Serialize, Deserialize, Debug)]
     117              : pub struct TenantDescribeResponse {
     118              :     pub tenant_id: TenantId,
     119              :     pub shards: Vec<TenantDescribeResponseShard>,
     120              :     pub stripe_size: ShardStripeSize,
     121              :     pub policy: PlacementPolicy,
     122              :     pub config: TenantConfig,
     123              : }
     124              : 
     125            0 : #[derive(Serialize, Deserialize, Debug)]
     126              : pub struct NodeShardResponse {
     127              :     pub node_id: NodeId,
     128              :     pub shards: Vec<NodeShard>,
     129              : }
     130              : 
     131            0 : #[derive(Serialize, Deserialize, Debug)]
     132              : pub struct NodeShard {
     133              :     pub tenant_shard_id: TenantShardId,
     134              :     /// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
     135              :     pub is_observed_secondary: Option<bool>,
     136              :     /// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
     137              :     pub is_intended_secondary: Option<bool>,
     138              : }
     139              : 
     140            0 : #[derive(Serialize, Deserialize)]
     141              : pub struct NodeDescribeResponse {
     142              :     pub id: NodeId,
     143              : 
     144              :     pub availability: NodeAvailabilityWrapper,
     145              :     pub scheduling: NodeSchedulingPolicy,
     146              : 
     147              :     pub listen_http_addr: String,
     148              :     pub listen_http_port: u16,
     149              : 
     150              :     pub listen_pg_addr: String,
     151              :     pub listen_pg_port: u16,
     152              : }
     153              : 
     154            0 : #[derive(Serialize, Deserialize, Debug)]
     155              : pub struct TenantDescribeResponseShard {
     156              :     pub tenant_shard_id: TenantShardId,
     157              : 
     158              :     pub node_attached: Option<NodeId>,
     159              :     pub node_secondary: Vec<NodeId>,
     160              : 
     161              :     pub last_error: String,
     162              : 
     163              :     /// A task is currently running to reconcile this tenant's intent state with the state on pageservers
     164              :     pub is_reconciling: bool,
     165              :     /// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
     166              :     pub is_pending_compute_notification: bool,
     167              :     /// A shard split is currently underway
     168              :     pub is_splitting: bool,
     169              : 
     170              :     pub scheduling_policy: ShardSchedulingPolicy,
     171              : 
     172              :     pub preferred_az_id: Option<String>,
     173              : }
     174              : 
     175              : /// Migration request for a given tenant shard to a given node.
     176              : ///
     177              : /// Explicitly migrating a particular shard is a low level operation
     178              : /// TODO: higher level "Reschedule tenant" operation where the request
     179              : /// specifies some constraints, e.g. asking it to get off particular node(s)
     180            0 : #[derive(Serialize, Deserialize, Debug)]
     181              : pub struct TenantShardMigrateRequest {
     182              :     pub tenant_shard_id: TenantShardId,
     183              :     pub node_id: NodeId,
     184              : }
     185              : 
     186              : #[derive(Serialize, Clone, Debug)]
     187              : #[serde(into = "NodeAvailabilityWrapper")]
     188              : pub enum NodeAvailability {
     189              :     // Normal, happy state
     190              :     Active(PageserverUtilization),
     191              :     // Node is warming up, but we expect it to become available soon. Covers
     192              :     // the time span between the re-attach response being composed on the storage controller
     193              :     // and the first successful heartbeat after the processing of the re-attach response
     194              :     // finishes on the pageserver.
     195              :     WarmingUp(Instant),
     196              :     // Offline: Tenants shouldn't try to attach here, but they may assume that their
     197              :     // secondary locations on this node still exist.  Newly added nodes are in this
     198              :     // state until we successfully contact them.
     199              :     Offline,
     200              : }
     201              : 
     202              : impl PartialEq for NodeAvailability {
     203            0 :     fn eq(&self, other: &Self) -> bool {
     204              :         use NodeAvailability::*;
     205            0 :         matches!(
     206            0 :             (self, other),
     207              :             (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
     208              :         )
     209            0 :     }
     210              : }
     211              : 
     212              : impl Eq for NodeAvailability {}
     213              : 
     214              : // This wrapper provides serde functionality and it should only be used to
     215              : // communicate with external callers which don't know or care about the
     216              : // utilisation score of the pageserver it is targeting.
     217            0 : #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
     218              : pub enum NodeAvailabilityWrapper {
     219              :     Active,
     220              :     WarmingUp,
     221              :     Offline,
     222              : }
     223              : 
     224              : impl From<NodeAvailabilityWrapper> for NodeAvailability {
     225            0 :     fn from(val: NodeAvailabilityWrapper) -> Self {
     226            0 :         match val {
     227              :             // Assume the worst utilisation score to begin with. It will later be updated by
     228              :             // the heartbeats.
     229              :             NodeAvailabilityWrapper::Active => {
     230            0 :                 NodeAvailability::Active(PageserverUtilization::full())
     231              :             }
     232            0 :             NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
     233            0 :             NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
     234              :         }
     235            0 :     }
     236              : }
     237              : 
     238              : impl From<NodeAvailability> for NodeAvailabilityWrapper {
     239            0 :     fn from(val: NodeAvailability) -> Self {
     240            0 :         match val {
     241            0 :             NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
     242            0 :             NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
     243            0 :             NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
     244              :         }
     245            0 :     }
     246              : }
     247              : 
     248            0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
     249              : pub enum ShardSchedulingPolicy {
     250              :     // Normal mode: the tenant's scheduled locations may be updated at will, including
     251              :     // for non-essential optimization.
     252              :     Active,
     253              : 
     254              :     // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
     255              :     // For example, this still permits a node's attachment location to change to a secondary in
     256              :     // response to a node failure, or to assign a new secondary if a node was removed.
     257              :     Essential,
     258              : 
     259              :     // No scheduling: leave the shard running wherever it currently is.  Even if the shard is
     260              :     // unavailable, it will not be rescheduled to another node.
     261              :     Pause,
     262              : 
     263              :     // No reconciling: we will make no location_conf API calls to pageservers at all.  If the
     264              :     // shard is unavailable, it stays that way.  If a node fails, this shard doesn't get failed over.
     265              :     Stop,
     266              : }
     267              : 
     268              : impl Default for ShardSchedulingPolicy {
     269        12519 :     fn default() -> Self {
     270        12519 :         Self::Active
     271        12519 :     }
     272              : }
     273              : 
     274            0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
     275              : pub enum NodeSchedulingPolicy {
     276              :     Active,
     277              :     Filling,
     278              :     Pause,
     279              :     PauseForRestart,
     280              :     Draining,
     281              : }
     282              : 
     283              : impl FromStr for NodeSchedulingPolicy {
     284              :     type Err = anyhow::Error;
     285              : 
     286            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     287            0 :         match s {
     288            0 :             "active" => Ok(Self::Active),
     289            0 :             "filling" => Ok(Self::Filling),
     290            0 :             "pause" => Ok(Self::Pause),
     291            0 :             "pause_for_restart" => Ok(Self::PauseForRestart),
     292            0 :             "draining" => Ok(Self::Draining),
     293            0 :             _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
     294              :         }
     295            0 :     }
     296              : }
     297              : 
     298              : impl From<NodeSchedulingPolicy> for String {
     299            0 :     fn from(value: NodeSchedulingPolicy) -> String {
     300              :         use NodeSchedulingPolicy::*;
     301            0 :         match value {
     302            0 :             Active => "active",
     303            0 :             Filling => "filling",
     304            0 :             Pause => "pause",
     305            0 :             PauseForRestart => "pause_for_restart",
     306            0 :             Draining => "draining",
     307              :         }
     308            0 :         .to_string()
     309            0 :     }
     310              : }
     311              : 
     312              : /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
     313              : /// to create secondary locations.
     314            4 : #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
     315              : pub enum PlacementPolicy {
     316              :     /// Normal live state: one attached pageserver and zero or more secondaries.
     317              :     Attached(usize),
     318              :     /// Create one secondary mode locations. This is useful when onboarding
     319              :     /// a tenant, or for an idle tenant that we might want to bring online quickly.
     320              :     Secondary,
     321              : 
     322              :     /// Do not attach to any pageservers.  This is appropriate for tenants that
     323              :     /// have been idle for a long time, where we do not mind some delay in making
     324              :     /// them available in future.
     325              :     Detached,
     326              : }
     327              : 
     328            0 : #[derive(Serialize, Deserialize, Debug)]
     329              : pub struct TenantShardMigrateResponse {}
     330              : 
     331              : /// Metadata health record posted from scrubber.
     332            0 : #[derive(Serialize, Deserialize, Debug)]
     333              : pub struct MetadataHealthRecord {
     334              :     pub tenant_shard_id: TenantShardId,
     335              :     pub healthy: bool,
     336              :     pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
     337              : }
     338              : 
     339            0 : #[derive(Serialize, Deserialize, Debug)]
     340              : pub struct MetadataHealthUpdateRequest {
     341              :     pub healthy_tenant_shards: HashSet<TenantShardId>,
     342              :     pub unhealthy_tenant_shards: HashSet<TenantShardId>,
     343              : }
     344              : 
     345            0 : #[derive(Serialize, Deserialize, Debug)]
     346              : pub struct MetadataHealthUpdateResponse {}
     347              : 
     348            0 : #[derive(Serialize, Deserialize, Debug)]
     349              : pub struct MetadataHealthListUnhealthyResponse {
     350              :     pub unhealthy_tenant_shards: Vec<TenantShardId>,
     351              : }
     352              : 
     353            0 : #[derive(Serialize, Deserialize, Debug)]
     354              : pub struct MetadataHealthListOutdatedRequest {
     355              :     #[serde(with = "humantime_serde")]
     356              :     pub not_scrubbed_for: Duration,
     357              : }
     358              : 
     359            0 : #[derive(Serialize, Deserialize, Debug)]
     360              : pub struct MetadataHealthListOutdatedResponse {
     361              :     pub health_records: Vec<MetadataHealthRecord>,
     362              : }
     363              : 
     364              : #[cfg(test)]
     365              : mod test {
     366              :     use super::*;
     367              :     use serde_json;
     368              : 
     369              :     /// Check stability of PlacementPolicy's serialization
     370              :     #[test]
     371            1 :     fn placement_policy_encoding() -> anyhow::Result<()> {
     372            1 :         let v = PlacementPolicy::Attached(1);
     373            1 :         let encoded = serde_json::to_string(&v)?;
     374            1 :         assert_eq!(encoded, "{\"Attached\":1}");
     375            1 :         assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
     376              : 
     377            1 :         let v = PlacementPolicy::Detached;
     378            1 :         let encoded = serde_json::to_string(&v)?;
     379            1 :         assert_eq!(encoded, "\"Detached\"");
     380            1 :         assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
     381            1 :         Ok(())
     382            1 :     }
     383              : 
     384              :     #[test]
     385            1 :     fn test_reject_unknown_field() {
     386            1 :         let id = TenantId::generate();
     387            1 :         let create_request = serde_json::json!({
     388            1 :             "new_tenant_id": id.to_string(),
     389            1 :             "unknown_field": "unknown_value".to_string(),
     390            1 :         });
     391            1 :         let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
     392            1 :         assert!(
     393            1 :             err.to_string().contains("unknown field `unknown_field`"),
     394            0 :             "expect unknown field `unknown_field` error, got: {}",
     395              :             err
     396              :         );
     397            1 :     }
     398              : }
        

Generated by: LCOV version 2.1-beta