LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - controller_api.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 33.3 % 87 29
Test Date: 2024-08-02 21:34:27 Functions: 3.5 % 314 11

            Line data    Source code
       1              : use std::collections::HashSet;
       2              : use std::str::FromStr;
       3              : use std::time::{Duration, Instant};
       4              : 
       5              : /// Request/response types for the storage controller
       6              : /// API (`/control/v1` prefix).  Implemented by the server
       7              : /// in [`storage_controller::http`]
       8              : use serde::{Deserialize, Serialize};
       9              : use utils::id::{NodeId, TenantId};
      10              : 
      11              : use crate::{
      12              :     models::{ShardParameters, TenantConfig},
      13              :     shard::{ShardStripeSize, TenantShardId},
      14              : };
      15              : 
      16            6 : #[derive(Serialize, Deserialize, Debug)]
      17              : #[serde(deny_unknown_fields)]
      18              : pub struct TenantCreateRequest {
      19              :     pub new_tenant_id: TenantShardId,
      20              :     #[serde(default)]
      21              :     #[serde(skip_serializing_if = "Option::is_none")]
      22              :     pub generation: Option<u32>,
      23              : 
      24              :     // If omitted, create a single shard with TenantShardId::unsharded()
      25              :     #[serde(default)]
      26              :     #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
      27              :     pub shard_parameters: ShardParameters,
      28              : 
      29              :     #[serde(default)]
      30              :     #[serde(skip_serializing_if = "Option::is_none")]
      31              :     pub placement_policy: Option<PlacementPolicy>,
      32              : 
      33              :     #[serde(flatten)]
      34              :     pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
      35              : }
      36              : 
      37            0 : #[derive(Serialize, Deserialize)]
      38              : pub struct TenantCreateResponseShard {
      39              :     pub shard_id: TenantShardId,
      40              :     pub node_id: NodeId,
      41              :     pub generation: u32,
      42              : }
      43              : 
      44            0 : #[derive(Serialize, Deserialize)]
      45              : pub struct TenantCreateResponse {
      46              :     pub shards: Vec<TenantCreateResponseShard>,
      47              : }
      48              : 
      49            0 : #[derive(Serialize, Deserialize)]
      50              : pub struct NodeRegisterRequest {
      51              :     pub node_id: NodeId,
      52              : 
      53              :     pub listen_pg_addr: String,
      54              :     pub listen_pg_port: u16,
      55              : 
      56              :     pub listen_http_addr: String,
      57              :     pub listen_http_port: u16,
      58              : }
      59              : 
      60            0 : #[derive(Serialize, Deserialize)]
      61              : pub struct NodeConfigureRequest {
      62              :     pub node_id: NodeId,
      63              : 
      64              :     pub availability: Option<NodeAvailabilityWrapper>,
      65              :     pub scheduling: Option<NodeSchedulingPolicy>,
      66              : }
      67              : 
      68            0 : #[derive(Serialize, Deserialize)]
      69              : pub struct TenantPolicyRequest {
      70              :     pub placement: Option<PlacementPolicy>,
      71              :     pub scheduling: Option<ShardSchedulingPolicy>,
      72              : }
      73              : 
      74            0 : #[derive(Serialize, Deserialize, Debug)]
      75              : pub struct TenantLocateResponseShard {
      76              :     pub shard_id: TenantShardId,
      77              :     pub node_id: NodeId,
      78              : 
      79              :     pub listen_pg_addr: String,
      80              :     pub listen_pg_port: u16,
      81              : 
      82              :     pub listen_http_addr: String,
      83              :     pub listen_http_port: u16,
      84              : }
      85              : 
      86            0 : #[derive(Serialize, Deserialize)]
      87              : pub struct TenantLocateResponse {
      88              :     pub shards: Vec<TenantLocateResponseShard>,
      89              :     pub shard_params: ShardParameters,
      90              : }
      91              : 
      92            0 : #[derive(Serialize, Deserialize, Debug)]
      93              : pub struct TenantDescribeResponse {
      94              :     pub tenant_id: TenantId,
      95              :     pub shards: Vec<TenantDescribeResponseShard>,
      96              :     pub stripe_size: ShardStripeSize,
      97              :     pub policy: PlacementPolicy,
      98              :     pub config: TenantConfig,
      99              : }
     100              : 
     101            0 : #[derive(Serialize, Deserialize)]
     102              : pub struct NodeDescribeResponse {
     103              :     pub id: NodeId,
     104              : 
     105              :     pub availability: NodeAvailabilityWrapper,
     106              :     pub scheduling: NodeSchedulingPolicy,
     107              : 
     108              :     pub listen_http_addr: String,
     109              :     pub listen_http_port: u16,
     110              : 
     111              :     pub listen_pg_addr: String,
     112              :     pub listen_pg_port: u16,
     113              : }
     114              : 
     115            0 : #[derive(Serialize, Deserialize, Debug)]
     116              : pub struct TenantDescribeResponseShard {
     117              :     pub tenant_shard_id: TenantShardId,
     118              : 
     119              :     pub node_attached: Option<NodeId>,
     120              :     pub node_secondary: Vec<NodeId>,
     121              : 
     122              :     pub last_error: String,
     123              : 
     124              :     /// A task is currently running to reconcile this tenant's intent state with the state on pageservers
     125              :     pub is_reconciling: bool,
     126              :     /// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
     127              :     pub is_pending_compute_notification: bool,
     128              :     /// A shard split is currently underway
     129              :     pub is_splitting: bool,
     130              : 
     131              :     pub scheduling_policy: ShardSchedulingPolicy,
     132              : }
     133              : 
     134              : /// Explicitly migrating a particular shard is a low level operation
     135              : /// TODO: higher level "Reschedule tenant" operation where the request
     136              : /// specifies some constraints, e.g. asking it to get off particular node(s)
     137            0 : #[derive(Serialize, Deserialize, Debug)]
     138              : pub struct TenantShardMigrateRequest {
     139              :     pub tenant_shard_id: TenantShardId,
     140              :     pub node_id: NodeId,
     141              : }
     142              : 
     143              : /// Utilisation score indicating how good a candidate a pageserver
     144              : /// is for scheduling the next tenant. See [`crate::models::PageserverUtilization`].
     145              : /// Lower values are better.
     146            0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Debug)]
     147              : pub struct UtilizationScore(pub u64);
     148              : 
     149              : impl UtilizationScore {
     150           44 :     pub fn worst() -> Self {
     151           44 :         UtilizationScore(u64::MAX)
     152           44 :     }
     153              : }
     154              : 
     155              : #[derive(Serialize, Clone, Copy, Debug)]
     156              : #[serde(into = "NodeAvailabilityWrapper")]
     157              : pub enum NodeAvailability {
     158              :     // Normal, happy state
     159              :     Active(UtilizationScore),
     160              :     // Node is warming up, but we expect it to become available soon. Covers
     161              :     // the time span between the re-attach response being composed on the storage controller
     162              :     // and the first successful heartbeat after the processing of the re-attach response
     163              :     // finishes on the pageserver.
     164              :     WarmingUp(Instant),
     165              :     // Offline: Tenants shouldn't try to attach here, but they may assume that their
     166              :     // secondary locations on this node still exist.  Newly added nodes are in this
     167              :     // state until we successfully contact them.
     168              :     Offline,
     169              : }
     170              : 
     171              : impl PartialEq for NodeAvailability {
     172            0 :     fn eq(&self, other: &Self) -> bool {
     173              :         use NodeAvailability::*;
     174            0 :         matches!(
     175            0 :             (self, other),
     176              :             (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
     177              :         )
     178            0 :     }
     179              : }
     180              : 
     181              : impl Eq for NodeAvailability {}
     182              : 
     183              : // This wrapper provides serde functionality and it should only be used to
     184              : // communicate with external callers which don't know or care about the
     185              : // utilisation score of the pageserver it is targeting.
     186            0 : #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
     187              : pub enum NodeAvailabilityWrapper {
     188              :     Active,
     189              :     WarmingUp,
     190              :     Offline,
     191              : }
     192              : 
     193              : impl From<NodeAvailabilityWrapper> for NodeAvailability {
     194            0 :     fn from(val: NodeAvailabilityWrapper) -> Self {
     195            0 :         match val {
     196              :             // Assume the worst utilisation score to begin with. It will later be updated by
     197              :             // the heartbeats.
     198            0 :             NodeAvailabilityWrapper::Active => NodeAvailability::Active(UtilizationScore::worst()),
     199            0 :             NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
     200            0 :             NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
     201              :         }
     202            0 :     }
     203              : }
     204              : 
     205              : impl From<NodeAvailability> for NodeAvailabilityWrapper {
     206            0 :     fn from(val: NodeAvailability) -> Self {
     207            0 :         match val {
     208            0 :             NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
     209            0 :             NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
     210            0 :             NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
     211              :         }
     212            0 :     }
     213              : }
     214              : 
     215            0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
     216              : pub enum ShardSchedulingPolicy {
     217              :     // Normal mode: the tenant's scheduled locations may be updated at will, including
     218              :     // for non-essential optimization.
     219              :     Active,
     220              : 
     221              :     // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
     222              :     // For example, this still permits a node's attachment location to change to a secondary in
     223              :     // response to a node failure, or to assign a new secondary if a node was removed.
     224              :     Essential,
     225              : 
     226              :     // No scheduling: leave the shard running wherever it currently is.  Even if the shard is
     227              :     // unavailable, it will not be rescheduled to another node.
     228              :     Pause,
     229              : 
     230              :     // No reconciling: we will make no location_conf API calls to pageservers at all.  If the
     231              :     // shard is unavailable, it stays that way.  If a node fails, this shard doesn't get failed over.
     232              :     Stop,
     233              : }
     234              : 
     235              : impl Default for ShardSchedulingPolicy {
     236           22 :     fn default() -> Self {
     237           22 :         Self::Active
     238           22 :     }
     239              : }
     240              : 
     241            0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
     242              : pub enum NodeSchedulingPolicy {
     243              :     Active,
     244              :     Filling,
     245              :     Pause,
     246              :     PauseForRestart,
     247              :     Draining,
     248              : }
     249              : 
     250              : impl FromStr for NodeSchedulingPolicy {
     251              :     type Err = anyhow::Error;
     252              : 
     253            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     254            0 :         match s {
     255            0 :             "active" => Ok(Self::Active),
     256            0 :             "filling" => Ok(Self::Filling),
     257            0 :             "pause" => Ok(Self::Pause),
     258            0 :             "pause_for_restart" => Ok(Self::PauseForRestart),
     259            0 :             "draining" => Ok(Self::Draining),
     260            0 :             _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
     261              :         }
     262            0 :     }
     263              : }
     264              : 
     265              : impl From<NodeSchedulingPolicy> for String {
     266            0 :     fn from(value: NodeSchedulingPolicy) -> String {
     267            0 :         use NodeSchedulingPolicy::*;
     268            0 :         match value {
     269            0 :             Active => "active",
     270            0 :             Filling => "filling",
     271            0 :             Pause => "pause",
     272            0 :             PauseForRestart => "pause_for_restart",
     273            0 :             Draining => "draining",
     274              :         }
     275            0 :         .to_string()
     276            0 :     }
     277              : }
     278              : 
     279              : /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
     280              : /// to create secondary locations.
     281            8 : #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
     282              : pub enum PlacementPolicy {
     283              :     /// Normal live state: one attached pageserver and zero or more secondaries.
     284              :     Attached(usize),
     285              :     /// Create one secondary mode locations. This is useful when onboarding
     286              :     /// a tenant, or for an idle tenant that we might want to bring online quickly.
     287              :     Secondary,
     288              : 
     289              :     /// Do not attach to any pageservers.  This is appropriate for tenants that
     290              :     /// have been idle for a long time, where we do not mind some delay in making
     291              :     /// them available in future.
     292              :     Detached,
     293              : }
     294              : 
     295            0 : #[derive(Serialize, Deserialize, Debug)]
     296              : pub struct TenantShardMigrateResponse {}
     297              : 
     298              : /// Metadata health record posted from scrubber.
     299            0 : #[derive(Serialize, Deserialize, Debug)]
     300              : pub struct MetadataHealthRecord {
     301              :     pub tenant_shard_id: TenantShardId,
     302              :     pub healthy: bool,
     303              :     pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
     304              : }
     305              : 
     306            0 : #[derive(Serialize, Deserialize, Debug)]
     307              : pub struct MetadataHealthUpdateRequest {
     308              :     pub healthy_tenant_shards: HashSet<TenantShardId>,
     309              :     pub unhealthy_tenant_shards: HashSet<TenantShardId>,
     310              : }
     311              : 
     312            0 : #[derive(Serialize, Deserialize, Debug)]
     313              : pub struct MetadataHealthUpdateResponse {}
     314              : 
     315            0 : #[derive(Serialize, Deserialize, Debug)]
     316              : 
     317              : pub struct MetadataHealthListUnhealthyResponse {
     318              :     pub unhealthy_tenant_shards: Vec<TenantShardId>,
     319              : }
     320              : 
     321            0 : #[derive(Serialize, Deserialize, Debug)]
     322              : 
     323              : pub struct MetadataHealthListOutdatedRequest {
     324              :     #[serde(with = "humantime_serde")]
     325              :     pub not_scrubbed_for: Duration,
     326              : }
     327              : 
     328            0 : #[derive(Serialize, Deserialize, Debug)]
     329              : 
     330              : pub struct MetadataHealthListOutdatedResponse {
     331              :     pub health_records: Vec<MetadataHealthRecord>,
     332              : }
     333              : 
     334              : #[cfg(test)]
     335              : mod test {
     336              :     use super::*;
     337              :     use serde_json;
     338              : 
     339              :     /// Check stability of PlacementPolicy's serialization
     340              :     #[test]
     341            2 :     fn placement_policy_encoding() -> anyhow::Result<()> {
     342            2 :         let v = PlacementPolicy::Attached(1);
     343            2 :         let encoded = serde_json::to_string(&v)?;
     344            2 :         assert_eq!(encoded, "{\"Attached\":1}");
     345            2 :         assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
     346              : 
     347            2 :         let v = PlacementPolicy::Detached;
     348            2 :         let encoded = serde_json::to_string(&v)?;
     349            2 :         assert_eq!(encoded, "\"Detached\"");
     350            2 :         assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
     351            2 :         Ok(())
     352            2 :     }
     353              : 
     354              :     #[test]
     355            2 :     fn test_reject_unknown_field() {
     356            2 :         let id = TenantId::generate();
     357            2 :         let create_request = serde_json::json!({
     358            2 :             "new_tenant_id": id.to_string(),
     359            2 :             "unknown_field": "unknown_value".to_string(),
     360            2 :         });
     361            2 :         let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
     362            2 :         assert!(
     363            2 :             err.to_string().contains("unknown field `unknown_field`"),
     364            0 :             "expect unknown field `unknown_field` error, got: {}",
     365              :             err
     366              :         );
     367            2 :     }
     368              : }
        

Generated by: LCOV version 2.1-beta