LCOV - code coverage report
Current view: top level - libs/pageserver_api/src - controller_api.rs (source / functions) Coverage Total Hit
Test: 90b23405d17e36048d3bb64e314067f397803f1b.info Lines: 30.2 % 86 26
Test Date: 2024-09-20 13:14:58 Functions: 2.7 % 367 10

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : use std::str::FromStr;
       3              : use std::time::{Duration, Instant};
       4              : 
       5              : /// Request/response types for the storage controller
       6              : /// API (`/control/v1` prefix).  Implemented by the server
       7              : /// in [`storage_controller::http`]
       8              : use serde::{Deserialize, Serialize};
       9              : use utils::id::{NodeId, TenantId};
      10              : 
      11              : use crate::models::PageserverUtilization;
      12              : use crate::{
      13              :     models::{ShardParameters, TenantConfig},
      14              :     shard::{ShardStripeSize, TenantShardId},
      15              : };
      16              : 
      17            3 : #[derive(Serialize, Deserialize, Debug)]
      18              : #[serde(deny_unknown_fields)]
      19              : pub struct TenantCreateRequest {
      20              :     pub new_tenant_id: TenantShardId,
      21              :     #[serde(default)]
      22              :     #[serde(skip_serializing_if = "Option::is_none")]
      23              :     pub generation: Option<u32>,
      24              : 
      25              :     // If omitted, create a single shard with TenantShardId::unsharded()
      26              :     #[serde(default)]
      27              :     #[serde(skip_serializing_if = "ShardParameters::is_unsharded")]
      28              :     pub shard_parameters: ShardParameters,
      29              : 
      30              :     #[serde(default)]
      31              :     #[serde(skip_serializing_if = "Option::is_none")]
      32              :     pub placement_policy: Option<PlacementPolicy>,
      33              : 
      34              :     #[serde(flatten)]
      35              :     pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
      36              : }
      37              : 
      38            0 : #[derive(Serialize, Deserialize)]
      39              : pub struct TenantCreateResponseShard {
      40              :     pub shard_id: TenantShardId,
      41              :     pub node_id: NodeId,
      42              :     pub generation: u32,
      43              : }
      44              : 
      45            0 : #[derive(Serialize, Deserialize)]
      46              : pub struct TenantCreateResponse {
      47              :     pub shards: Vec<TenantCreateResponseShard>,
      48              : }
      49              : 
      50            0 : #[derive(Serialize, Deserialize)]
      51              : pub struct NodeRegisterRequest {
      52              :     pub node_id: NodeId,
      53              : 
      54              :     pub listen_pg_addr: String,
      55              :     pub listen_pg_port: u16,
      56              : 
      57              :     pub listen_http_addr: String,
      58              :     pub listen_http_port: u16,
      59              : 
      60              :     pub availability_zone_id: String,
      61              : }
      62              : 
      63            0 : #[derive(Serialize, Deserialize)]
      64              : pub struct NodeConfigureRequest {
      65              :     pub node_id: NodeId,
      66              : 
      67              :     pub availability: Option<NodeAvailabilityWrapper>,
      68              :     pub scheduling: Option<NodeSchedulingPolicy>,
      69              : }
      70              : 
      71            0 : #[derive(Serialize, Deserialize)]
      72              : pub struct TenantPolicyRequest {
      73              :     pub placement: Option<PlacementPolicy>,
      74              :     pub scheduling: Option<ShardSchedulingPolicy>,
      75              : }
      76              : 
      77            0 : #[derive(Serialize, Deserialize)]
      78              : pub struct ShardsPreferredAzsRequest {
      79              :     #[serde(flatten)]
      80              :     pub preferred_az_ids: HashMap<TenantShardId, String>,
      81              : }
      82              : 
      83            0 : #[derive(Serialize, Deserialize)]
      84              : pub struct ShardsPreferredAzsResponse {
      85              :     pub updated: Vec<TenantShardId>,
      86              : }
      87              : 
      88            0 : #[derive(Serialize, Deserialize, Debug)]
      89              : pub struct TenantLocateResponseShard {
      90              :     pub shard_id: TenantShardId,
      91              :     pub node_id: NodeId,
      92              : 
      93              :     pub listen_pg_addr: String,
      94              :     pub listen_pg_port: u16,
      95              : 
      96              :     pub listen_http_addr: String,
      97              :     pub listen_http_port: u16,
      98              : }
      99              : 
     100            0 : #[derive(Serialize, Deserialize)]
     101              : pub struct TenantLocateResponse {
     102              :     pub shards: Vec<TenantLocateResponseShard>,
     103              :     pub shard_params: ShardParameters,
     104              : }
     105              : 
     106            0 : #[derive(Serialize, Deserialize, Debug)]
     107              : pub struct TenantDescribeResponse {
     108              :     pub tenant_id: TenantId,
     109              :     pub shards: Vec<TenantDescribeResponseShard>,
     110              :     pub stripe_size: ShardStripeSize,
     111              :     pub policy: PlacementPolicy,
     112              :     pub config: TenantConfig,
     113              : }
     114              : 
     115            0 : #[derive(Serialize, Deserialize, Debug)]
     116              : pub struct NodeShardResponse {
     117              :     pub node_id: NodeId,
     118              :     pub shards: Vec<NodeShard>,
     119              : }
     120              : 
     121            0 : #[derive(Serialize, Deserialize, Debug)]
     122              : pub struct NodeShard {
     123              :     pub tenant_shard_id: TenantShardId,
     124              :     /// Whether the shard is observed secondary on a specific node. True = yes, False = no, None = not on this node.
     125              :     pub is_observed_secondary: Option<bool>,
     126              :     /// Whether the shard is intended to be a secondary on a specific node. True = yes, False = no, None = not on this node.
     127              :     pub is_intended_secondary: Option<bool>,
     128              : }
     129              : 
     130            0 : #[derive(Serialize, Deserialize)]
     131              : pub struct NodeDescribeResponse {
     132              :     pub id: NodeId,
     133              : 
     134              :     pub availability: NodeAvailabilityWrapper,
     135              :     pub scheduling: NodeSchedulingPolicy,
     136              : 
     137              :     pub listen_http_addr: String,
     138              :     pub listen_http_port: u16,
     139              : 
     140              :     pub listen_pg_addr: String,
     141              :     pub listen_pg_port: u16,
     142              : }
     143              : 
     144            0 : #[derive(Serialize, Deserialize, Debug)]
     145              : pub struct TenantDescribeResponseShard {
     146              :     pub tenant_shard_id: TenantShardId,
     147              : 
     148              :     pub node_attached: Option<NodeId>,
     149              :     pub node_secondary: Vec<NodeId>,
     150              : 
     151              :     pub last_error: String,
     152              : 
     153              :     /// A task is currently running to reconcile this tenant's intent state with the state on pageservers
     154              :     pub is_reconciling: bool,
     155              :     /// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
     156              :     pub is_pending_compute_notification: bool,
     157              :     /// A shard split is currently underway
     158              :     pub is_splitting: bool,
     159              : 
     160              :     pub scheduling_policy: ShardSchedulingPolicy,
     161              : 
     162              :     pub preferred_az_id: Option<String>,
     163              : }
     164              : 
     165              : /// Migration request for a given tenant shard to a given node.
     166              : ///
     167              : /// Explicitly migrating a particular shard is a low level operation
     168              : /// TODO: higher level "Reschedule tenant" operation where the request
     169              : /// specifies some constraints, e.g. asking it to get off particular node(s)
     170            0 : #[derive(Serialize, Deserialize, Debug)]
     171              : pub struct TenantShardMigrateRequest {
     172              :     pub tenant_shard_id: TenantShardId,
     173              :     pub node_id: NodeId,
     174              : }
     175              : 
     176              : #[derive(Serialize, Clone, Debug)]
     177              : #[serde(into = "NodeAvailabilityWrapper")]
     178              : pub enum NodeAvailability {
     179              :     // Normal, happy state
     180              :     Active(PageserverUtilization),
     181              :     // Node is warming up, but we expect it to become available soon. Covers
     182              :     // the time span between the re-attach response being composed on the storage controller
     183              :     // and the first successful heartbeat after the processing of the re-attach response
     184              :     // finishes on the pageserver.
     185              :     WarmingUp(Instant),
     186              :     // Offline: Tenants shouldn't try to attach here, but they may assume that their
     187              :     // secondary locations on this node still exist.  Newly added nodes are in this
     188              :     // state until we successfully contact them.
     189              :     Offline,
     190              : }
     191              : 
     192              : impl PartialEq for NodeAvailability {
     193            0 :     fn eq(&self, other: &Self) -> bool {
     194              :         use NodeAvailability::*;
     195            0 :         matches!(
     196            0 :             (self, other),
     197              :             (Active(_), Active(_)) | (Offline, Offline) | (WarmingUp(_), WarmingUp(_))
     198              :         )
     199            0 :     }
     200              : }
     201              : 
     202              : impl Eq for NodeAvailability {}
     203              : 
     204              : // This wrapper provides serde functionality and it should only be used to
     205              : // communicate with external callers which don't know or care about the
     206              : // utilisation score of the pageserver it is targeting.
     207            0 : #[derive(Serialize, Deserialize, Clone, Copy, Debug)]
     208              : pub enum NodeAvailabilityWrapper {
     209              :     Active,
     210              :     WarmingUp,
     211              :     Offline,
     212              : }
     213              : 
     214              : impl From<NodeAvailabilityWrapper> for NodeAvailability {
     215            0 :     fn from(val: NodeAvailabilityWrapper) -> Self {
     216            0 :         match val {
     217              :             // Assume the worst utilisation score to begin with. It will later be updated by
     218              :             // the heartbeats.
     219              :             NodeAvailabilityWrapper::Active => {
     220            0 :                 NodeAvailability::Active(PageserverUtilization::full())
     221              :             }
     222            0 :             NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()),
     223            0 :             NodeAvailabilityWrapper::Offline => NodeAvailability::Offline,
     224              :         }
     225            0 :     }
     226              : }
     227              : 
     228              : impl From<NodeAvailability> for NodeAvailabilityWrapper {
     229            0 :     fn from(val: NodeAvailability) -> Self {
     230            0 :         match val {
     231            0 :             NodeAvailability::Active(_) => NodeAvailabilityWrapper::Active,
     232            0 :             NodeAvailability::WarmingUp(_) => NodeAvailabilityWrapper::WarmingUp,
     233            0 :             NodeAvailability::Offline => NodeAvailabilityWrapper::Offline,
     234              :         }
     235            0 :     }
     236              : }
     237              : 
     238            0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
     239              : pub enum ShardSchedulingPolicy {
     240              :     // Normal mode: the tenant's scheduled locations may be updated at will, including
     241              :     // for non-essential optimization.
     242              :     Active,
     243              : 
     244              :     // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
     245              :     // For example, this still permits a node's attachment location to change to a secondary in
     246              :     // response to a node failure, or to assign a new secondary if a node was removed.
     247              :     Essential,
     248              : 
     249              :     // No scheduling: leave the shard running wherever it currently is.  Even if the shard is
     250              :     // unavailable, it will not be rescheduled to another node.
     251              :     Pause,
     252              : 
     253              :     // No reconciling: we will make no location_conf API calls to pageservers at all.  If the
     254              :     // shard is unavailable, it stays that way.  If a node fails, this shard doesn't get failed over.
     255              :     Stop,
     256              : }
     257              : 
     258              : impl Default for ShardSchedulingPolicy {
     259           11 :     fn default() -> Self {
     260           11 :         Self::Active
     261           11 :     }
     262              : }
     263              : 
     264            0 : #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
     265              : pub enum NodeSchedulingPolicy {
     266              :     Active,
     267              :     Filling,
     268              :     Pause,
     269              :     PauseForRestart,
     270              :     Draining,
     271              : }
     272              : 
     273              : impl FromStr for NodeSchedulingPolicy {
     274              :     type Err = anyhow::Error;
     275              : 
     276            0 :     fn from_str(s: &str) -> Result<Self, Self::Err> {
     277            0 :         match s {
     278            0 :             "active" => Ok(Self::Active),
     279            0 :             "filling" => Ok(Self::Filling),
     280            0 :             "pause" => Ok(Self::Pause),
     281            0 :             "pause_for_restart" => Ok(Self::PauseForRestart),
     282            0 :             "draining" => Ok(Self::Draining),
     283            0 :             _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
     284              :         }
     285            0 :     }
     286              : }
     287              : 
     288              : impl From<NodeSchedulingPolicy> for String {
     289            0 :     fn from(value: NodeSchedulingPolicy) -> String {
     290              :         use NodeSchedulingPolicy::*;
     291            0 :         match value {
     292            0 :             Active => "active",
     293            0 :             Filling => "filling",
     294            0 :             Pause => "pause",
     295            0 :             PauseForRestart => "pause_for_restart",
     296            0 :             Draining => "draining",
     297              :         }
     298            0 :         .to_string()
     299            0 :     }
     300              : }
     301              : 
     302              : /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
     303              : /// to create secondary locations.
     304            4 : #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
     305              : pub enum PlacementPolicy {
     306              :     /// Normal live state: one attached pageserver and zero or more secondaries.
     307              :     Attached(usize),
     308              :     /// Create one secondary mode locations. This is useful when onboarding
     309              :     /// a tenant, or for an idle tenant that we might want to bring online quickly.
     310              :     Secondary,
     311              : 
     312              :     /// Do not attach to any pageservers.  This is appropriate for tenants that
     313              :     /// have been idle for a long time, where we do not mind some delay in making
     314              :     /// them available in future.
     315              :     Detached,
     316              : }
     317              : 
     318            0 : #[derive(Serialize, Deserialize, Debug)]
     319              : pub struct TenantShardMigrateResponse {}
     320              : 
     321              : /// Metadata health record posted from scrubber.
     322            0 : #[derive(Serialize, Deserialize, Debug)]
     323              : pub struct MetadataHealthRecord {
     324              :     pub tenant_shard_id: TenantShardId,
     325              :     pub healthy: bool,
     326              :     pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
     327              : }
     328              : 
     329            0 : #[derive(Serialize, Deserialize, Debug)]
     330              : pub struct MetadataHealthUpdateRequest {
     331              :     pub healthy_tenant_shards: HashSet<TenantShardId>,
     332              :     pub unhealthy_tenant_shards: HashSet<TenantShardId>,
     333              : }
     334              : 
     335            0 : #[derive(Serialize, Deserialize, Debug)]
     336              : pub struct MetadataHealthUpdateResponse {}
     337              : 
     338            0 : #[derive(Serialize, Deserialize, Debug)]
     339              : pub struct MetadataHealthListUnhealthyResponse {
     340              :     pub unhealthy_tenant_shards: Vec<TenantShardId>,
     341              : }
     342              : 
     343            0 : #[derive(Serialize, Deserialize, Debug)]
     344              : pub struct MetadataHealthListOutdatedRequest {
     345              :     #[serde(with = "humantime_serde")]
     346              :     pub not_scrubbed_for: Duration,
     347              : }
     348              : 
     349            0 : #[derive(Serialize, Deserialize, Debug)]
     350              : pub struct MetadataHealthListOutdatedResponse {
     351              :     pub health_records: Vec<MetadataHealthRecord>,
     352              : }
     353              : 
     354              : #[cfg(test)]
     355              : mod test {
     356              :     use super::*;
     357              :     use serde_json;
     358              : 
     359              :     /// Check stability of PlacementPolicy's serialization
     360              :     #[test]
     361            1 :     fn placement_policy_encoding() -> anyhow::Result<()> {
     362            1 :         let v = PlacementPolicy::Attached(1);
     363            1 :         let encoded = serde_json::to_string(&v)?;
     364            1 :         assert_eq!(encoded, "{\"Attached\":1}");
     365            1 :         assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
     366              : 
     367            1 :         let v = PlacementPolicy::Detached;
     368            1 :         let encoded = serde_json::to_string(&v)?;
     369            1 :         assert_eq!(encoded, "\"Detached\"");
     370            1 :         assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
     371            1 :         Ok(())
     372            1 :     }
     373              : 
     374              :     #[test]
     375            1 :     fn test_reject_unknown_field() {
     376            1 :         let id = TenantId::generate();
     377            1 :         let create_request = serde_json::json!({
     378            1 :             "new_tenant_id": id.to_string(),
     379            1 :             "unknown_field": "unknown_value".to_string(),
     380            1 :         });
     381            1 :         let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
     382            1 :         assert!(
     383            1 :             err.to_string().contains("unknown field `unknown_field`"),
     384            0 :             "expect unknown field `unknown_field` error, got: {}",
     385              :             err
     386              :         );
     387            1 :     }
     388              : }
        

Generated by: LCOV version 2.1-beta