LCOV - code coverage report
Current view: top level - proxy/src/control_plane - messages.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 65.3 % 202 132
Test Date: 2025-02-20 13:11:02 Functions: 15.2 % 132 20

            Line data    Source code
       1              : use std::fmt::{self, Display};
       2              : 
       3              : use measured::FixedCardinalityLabel;
       4              : use serde::{Deserialize, Serialize};
       5              : 
       6              : use crate::auth::IpPattern;
       7              : use crate::intern::{AccountIdInt, BranchIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
       8              : use crate::proxy::retry::CouldRetry;
       9              : 
      10              : /// Generic error response with human-readable description.
      11              : /// Note that we can't always present it to user as is.
      12            0 : #[derive(Debug, Deserialize, Clone)]
      13              : pub(crate) struct ControlPlaneErrorMessage {
      14              :     pub(crate) error: Box<str>,
      15              :     #[serde(skip)]
      16              :     pub(crate) http_status_code: http::StatusCode,
      17              :     pub(crate) status: Option<Status>,
      18              : }
      19              : 
      20              : impl ControlPlaneErrorMessage {
      21            3 :     pub(crate) fn get_reason(&self) -> Reason {
      22            3 :         self.status
      23            3 :             .as_ref()
      24            3 :             .and_then(|s| s.details.error_info.as_ref())
      25            3 :             .map_or(Reason::Unknown, |e| e.reason)
      26            3 :     }
      27              : 
      28            0 :     pub(crate) fn get_user_facing_message(&self) -> String {
      29              :         use super::errors::REQUEST_FAILED;
      30            0 :         self.status
      31            0 :             .as_ref()
      32            0 :             .and_then(|s| s.details.user_facing_message.as_ref())
      33            0 :             .map_or_else(|| {
      34            0 :                 // Ask @neondatabase/control-plane for review before adding more.
      35            0 :                 match self.http_status_code {
      36              :                     http::StatusCode::NOT_FOUND => {
      37              :                         // Status 404: failed to get a project-related resource.
      38            0 :                         format!("{REQUEST_FAILED}: endpoint cannot be found")
      39              :                     }
      40              :                     http::StatusCode::NOT_ACCEPTABLE => {
      41              :                         // Status 406: endpoint is disabled (we don't allow connections).
      42            0 :                         format!("{REQUEST_FAILED}: endpoint is disabled")
      43              :                     }
      44              :                     http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
      45              :                         // Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
      46            0 :                         format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
      47              :                     }
      48            0 :                     _ => REQUEST_FAILED.to_owned(),
      49              :                 }
      50            0 :             }, |m| m.message.clone().into())
      51            0 :     }
      52              : }
      53              : 
      54              : impl Display for ControlPlaneErrorMessage {
      55            0 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
      56            0 :         let msg: &str = self
      57            0 :             .status
      58            0 :             .as_ref()
      59            0 :             .and_then(|s| s.details.user_facing_message.as_ref())
      60            0 :             .map_or_else(|| self.error.as_ref(), |m| m.message.as_ref());
      61            0 :         write!(f, "{msg}")
      62            0 :     }
      63              : }
      64              : 
      65              : impl CouldRetry for ControlPlaneErrorMessage {
      66            6 :     fn could_retry(&self) -> bool {
      67              :         // If the error message does not have a status,
      68              :         // the error is unknown and probably should not retry automatically
      69            6 :         let Some(status) = &self.status else {
      70            2 :             return false;
      71              :         };
      72              : 
      73              :         // retry if the retry info is set.
      74            4 :         if status.details.retry_info.is_some() {
      75            4 :             return true;
      76            0 :         }
      77            0 : 
      78            0 :         // if no retry info set, attempt to use the error code to guess the retry state.
      79            0 :         let reason = status
      80            0 :             .details
      81            0 :             .error_info
      82            0 :             .map_or(Reason::Unknown, |e| e.reason);
      83            0 : 
      84            0 :         reason.can_retry()
      85            6 :     }
      86              : }
      87              : 
      88            0 : #[derive(Debug, Deserialize, Clone)]
      89              : #[allow(dead_code)]
      90              : pub(crate) struct Status {
      91              :     pub(crate) code: Box<str>,
      92              :     pub(crate) message: Box<str>,
      93              :     pub(crate) details: Details,
      94              : }
      95              : 
      96            0 : #[derive(Debug, Deserialize, Clone)]
      97              : pub(crate) struct Details {
      98              :     pub(crate) error_info: Option<ErrorInfo>,
      99              :     pub(crate) retry_info: Option<RetryInfo>,
     100              :     pub(crate) user_facing_message: Option<UserFacingMessage>,
     101              : }
     102              : 
     103            0 : #[derive(Copy, Clone, Debug, Deserialize)]
     104              : pub(crate) struct ErrorInfo {
     105              :     pub(crate) reason: Reason,
     106              :     // Schema could also have `metadata` field, but it's not structured. Skip it for now.
     107              : }
     108              : 
     109            0 : #[derive(Clone, Copy, Debug, Deserialize, Default)]
     110              : pub(crate) enum Reason {
     111              :     /// RoleProtected indicates that the role is protected and the attempted operation is not permitted on protected roles.
     112              :     #[serde(rename = "ROLE_PROTECTED")]
     113              :     RoleProtected,
     114              :     /// ResourceNotFound indicates that a resource (project, endpoint, branch, etc.) wasn't found,
     115              :     /// usually due to the provided ID not being correct or because the subject doesn't have enough permissions to
     116              :     /// access the requested resource.
     117              :     /// Prefer a more specific reason if possible, e.g., ProjectNotFound, EndpointNotFound, etc.
     118              :     #[serde(rename = "RESOURCE_NOT_FOUND")]
     119              :     ResourceNotFound,
     120              :     /// ProjectNotFound indicates that the project wasn't found, usually due to the provided ID not being correct,
     121              :     /// or that the subject doesn't have enough permissions to access the requested project.
     122              :     #[serde(rename = "PROJECT_NOT_FOUND")]
     123              :     ProjectNotFound,
     124              :     /// EndpointNotFound indicates that the endpoint wasn't found, usually due to the provided ID not being correct,
     125              :     /// or that the subject doesn't have enough permissions to access the requested endpoint.
     126              :     #[serde(rename = "ENDPOINT_NOT_FOUND")]
     127              :     EndpointNotFound,
     128              :     /// BranchNotFound indicates that the branch wasn't found, usually due to the provided ID not being correct,
     129              :     /// or that the subject doesn't have enough permissions to access the requested branch.
     130              :     #[serde(rename = "BRANCH_NOT_FOUND")]
     131              :     BranchNotFound,
     132              :     /// RateLimitExceeded indicates that the rate limit for the operation has been exceeded.
     133              :     #[serde(rename = "RATE_LIMIT_EXCEEDED")]
     134              :     RateLimitExceeded,
     135              :     /// NonDefaultBranchComputeTimeExceeded indicates that the compute time quota of non-default branches has been
     136              :     /// exceeded.
     137              :     #[serde(rename = "NON_PRIMARY_BRANCH_COMPUTE_TIME_EXCEEDED")]
     138              :     NonDefaultBranchComputeTimeExceeded,
     139              :     /// ActiveTimeQuotaExceeded indicates that the active time quota was exceeded.
     140              :     #[serde(rename = "ACTIVE_TIME_QUOTA_EXCEEDED")]
     141              :     ActiveTimeQuotaExceeded,
     142              :     /// ComputeTimeQuotaExceeded indicates that the compute time quota was exceeded.
     143              :     #[serde(rename = "COMPUTE_TIME_QUOTA_EXCEEDED")]
     144              :     ComputeTimeQuotaExceeded,
     145              :     /// WrittenDataQuotaExceeded indicates that the written data quota was exceeded.
     146              :     #[serde(rename = "WRITTEN_DATA_QUOTA_EXCEEDED")]
     147              :     WrittenDataQuotaExceeded,
     148              :     /// DataTransferQuotaExceeded indicates that the data transfer quota was exceeded.
     149              :     #[serde(rename = "DATA_TRANSFER_QUOTA_EXCEEDED")]
     150              :     DataTransferQuotaExceeded,
     151              :     /// LogicalSizeQuotaExceeded indicates that the logical size quota was exceeded.
     152              :     #[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")]
     153              :     LogicalSizeQuotaExceeded,
     154              :     /// RunningOperations indicates that the project already has some running operations
     155              :     /// and scheduling of new ones is prohibited.
     156              :     #[serde(rename = "RUNNING_OPERATIONS")]
     157              :     RunningOperations,
     158              :     /// ConcurrencyLimitReached indicates that the concurrency limit for an action was reached.
     159              :     #[serde(rename = "CONCURRENCY_LIMIT_REACHED")]
     160              :     ConcurrencyLimitReached,
     161              :     /// LockAlreadyTaken indicates that the we attempted to take a lock that was already taken.
     162              :     #[serde(rename = "LOCK_ALREADY_TAKEN")]
     163              :     LockAlreadyTaken,
     164              :     /// ActiveEndpointsLimitExceeded indicates that the limit of concurrently active endpoints was exceeded.
     165              :     #[serde(rename = "ACTIVE_ENDPOINTS_LIMIT_EXCEEDED")]
     166              :     ActiveEndpointsLimitExceeded,
     167              :     #[default]
     168              :     #[serde(other)]
     169              :     Unknown,
     170              : }
     171              : 
     172              : impl Reason {
     173            0 :     pub(crate) fn is_not_found(self) -> bool {
     174            0 :         matches!(
     175            0 :             self,
     176              :             Reason::ResourceNotFound
     177              :                 | Reason::ProjectNotFound
     178              :                 | Reason::EndpointNotFound
     179              :                 | Reason::BranchNotFound
     180              :         )
     181            0 :     }
     182              : 
     183            0 :     pub(crate) fn can_retry(self) -> bool {
     184            0 :         match self {
     185              :             // do not retry role protected errors
     186              :             // not a transitive error
     187            0 :             Reason::RoleProtected => false,
     188              :             // on retry, it will still not be found
     189              :             Reason::ResourceNotFound
     190              :             | Reason::ProjectNotFound
     191              :             | Reason::EndpointNotFound
     192            0 :             | Reason::BranchNotFound => false,
     193              :             // we were asked to go away
     194              :             Reason::RateLimitExceeded
     195              :             | Reason::NonDefaultBranchComputeTimeExceeded
     196              :             | Reason::ActiveTimeQuotaExceeded
     197              :             | Reason::ComputeTimeQuotaExceeded
     198              :             | Reason::WrittenDataQuotaExceeded
     199              :             | Reason::DataTransferQuotaExceeded
     200              :             | Reason::LogicalSizeQuotaExceeded
     201            0 :             | Reason::ActiveEndpointsLimitExceeded => false,
     202              :             // transitive error. control plane is currently busy
     203              :             // but might be ready soon
     204              :             Reason::RunningOperations
     205              :             | Reason::ConcurrencyLimitReached
     206            0 :             | Reason::LockAlreadyTaken => true,
     207              :             // unknown error. better not retry it.
     208            0 :             Reason::Unknown => false,
     209              :         }
     210            0 :     }
     211              : }
     212              : 
     213            0 : #[derive(Copy, Clone, Debug, Deserialize)]
     214              : #[allow(dead_code)]
     215              : pub(crate) struct RetryInfo {
     216              :     pub(crate) retry_delay_ms: u64,
     217              : }
     218              : 
     219            0 : #[derive(Debug, Deserialize, Clone)]
     220              : pub(crate) struct UserFacingMessage {
     221              :     pub(crate) message: Box<str>,
     222              : }
     223              : 
     224              : /// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`].
     225              : /// Returned by the `/get_endpoint_access_control` API method.
     226           12 : #[derive(Deserialize)]
     227              : pub(crate) struct GetEndpointAccessControl {
     228              :     pub(crate) role_secret: Box<str>,
     229              :     pub(crate) allowed_ips: Option<Vec<IpPattern>>,
     230              :     pub(crate) allowed_vpc_endpoint_ids: Option<Vec<String>>,
     231              :     pub(crate) project_id: Option<ProjectIdInt>,
     232              :     pub(crate) account_id: Option<AccountIdInt>,
     233              :     pub(crate) block_public_connections: Option<bool>,
     234              :     pub(crate) block_vpc_connections: Option<bool>,
     235              : }
     236              : 
     237              : /// Response which holds compute node's `host:port` pair.
     238              : /// Returned by the `/proxy_wake_compute` API method.
     239            2 : #[derive(Debug, Deserialize)]
     240              : pub(crate) struct WakeCompute {
     241              :     pub(crate) address: Box<str>,
     242              :     pub(crate) aux: MetricsAuxInfo,
     243              : }
     244              : 
     245              : /// Async response which concludes the console redirect auth flow.
     246              : /// Also known as `kickResponse` in the console.
     247            2 : #[derive(Debug, Deserialize)]
     248              : pub(crate) struct KickSession<'a> {
     249              :     /// Session ID is assigned by the proxy.
     250              :     pub(crate) session_id: &'a str,
     251              : 
     252              :     /// Compute node connection params.
     253              :     #[serde(deserialize_with = "KickSession::parse_db_info")]
     254              :     pub(crate) result: DatabaseInfo,
     255              : }
     256              : 
     257              : impl KickSession<'_> {
     258            1 :     fn parse_db_info<'de, D>(des: D) -> Result<DatabaseInfo, D::Error>
     259            1 :     where
     260            1 :         D: serde::Deserializer<'de>,
     261            1 :     {
     262            1 :         #[derive(Deserialize)]
     263              :         enum Wrapper {
     264              :             // Currently, console only reports `Success`.
     265              :             // `Failure(String)` used to be here... RIP.
     266              :             Success(DatabaseInfo),
     267              :         }
     268              : 
     269            1 :         Wrapper::deserialize(des).map(|x| match x {
     270            1 :             Wrapper::Success(info) => info,
     271            1 :         })
     272            1 :     }
     273              : }
     274              : 
     275              : /// Compute node connection params.
     276           31 : #[derive(Deserialize)]
     277              : pub(crate) struct DatabaseInfo {
     278              :     pub(crate) host: Box<str>,
     279              :     pub(crate) port: u16,
     280              :     pub(crate) dbname: Box<str>,
     281              :     pub(crate) user: Box<str>,
     282              :     /// Console always provides a password, but it might
     283              :     /// be inconvenient for debug with local PG instance.
     284              :     pub(crate) password: Option<Box<str>>,
     285              :     pub(crate) aux: MetricsAuxInfo,
     286              :     #[serde(default)]
     287              :     pub(crate) allowed_ips: Option<Vec<IpPattern>>,
     288              :     #[serde(default)]
     289              :     pub(crate) allowed_vpc_endpoint_ids: Option<Vec<String>>,
     290              :     #[serde(default)]
     291              :     pub(crate) public_access_allowed: Option<bool>,
     292              : }
     293              : 
     294              : // Manually implement debug to omit sensitive info.
     295              : impl fmt::Debug for DatabaseInfo {
     296            0 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     297            0 :         f.debug_struct("DatabaseInfo")
     298            0 :             .field("host", &self.host)
     299            0 :             .field("port", &self.port)
     300            0 :             .field("dbname", &self.dbname)
     301            0 :             .field("user", &self.user)
     302            0 :             .field("allowed_ips", &self.allowed_ips)
     303            0 :             .field("allowed_vpc_endpoint_ids", &self.allowed_vpc_endpoint_ids)
     304            0 :             .finish_non_exhaustive()
     305            0 :     }
     306              : }
     307              : 
     308              : /// Various labels for prometheus metrics.
     309              : /// Also known as `ProxyMetricsAuxInfo` in the console.
     310           24 : #[derive(Debug, Deserialize, Clone)]
     311              : pub(crate) struct MetricsAuxInfo {
     312              :     pub(crate) endpoint_id: EndpointIdInt,
     313              :     pub(crate) project_id: ProjectIdInt,
     314              :     pub(crate) branch_id: BranchIdInt,
     315              :     #[serde(default)]
     316              :     pub(crate) cold_start_info: ColdStartInfo,
     317              : }
     318              : 
     319            6 : #[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, FixedCardinalityLabel)]
     320              : #[serde(rename_all = "snake_case")]
     321              : pub enum ColdStartInfo {
     322              :     #[default]
     323              :     Unknown,
     324              :     /// Compute was already running
     325              :     Warm,
     326              :     #[serde(rename = "pool_hit")]
     327              :     #[label(rename = "pool_hit")]
     328              :     /// Compute was not running but there was an available VM
     329              :     VmPoolHit,
     330              :     #[serde(rename = "pool_miss")]
     331              :     #[label(rename = "pool_miss")]
     332              :     /// Compute was not running and there were no VMs available
     333              :     VmPoolMiss,
     334              : 
     335              :     // not provided by control plane
     336              :     /// Connection available from HTTP pool
     337              :     HttpPoolHit,
     338              :     /// Cached connection info
     339              :     WarmCached,
     340              : }
     341              : 
     342              : impl ColdStartInfo {
     343            0 :     pub(crate) fn as_str(self) -> &'static str {
     344            0 :         match self {
     345            0 :             ColdStartInfo::Unknown => "unknown",
     346            0 :             ColdStartInfo::Warm => "warm",
     347            0 :             ColdStartInfo::VmPoolHit => "pool_hit",
     348            0 :             ColdStartInfo::VmPoolMiss => "pool_miss",
     349            0 :             ColdStartInfo::HttpPoolHit => "http_pool_hit",
     350            0 :             ColdStartInfo::WarmCached => "warm_cached",
     351              :         }
     352            0 :     }
     353              : }
     354              : 
     355            0 : #[derive(Debug, Deserialize, Clone)]
     356              : pub struct EndpointJwksResponse {
     357              :     pub jwks: Vec<JwksSettings>,
     358              : }
     359              : 
     360            0 : #[derive(Debug, Deserialize, Clone)]
     361              : pub struct JwksSettings {
     362              :     pub id: String,
     363              :     pub jwks_url: url::Url,
     364              :     #[serde(rename = "provider_name")]
     365              :     pub _provider_name: String,
     366              :     pub jwt_audience: Option<String>,
     367              :     pub role_names: Vec<RoleNameInt>,
     368              : }
     369              : 
     370              : #[cfg(test)]
     371              : mod tests {
     372              :     use serde_json::json;
     373              : 
     374              :     use super::*;
     375              : 
     376            6 :     fn dummy_aux() -> serde_json::Value {
     377            6 :         json!({
     378            6 :             "endpoint_id": "endpoint",
     379            6 :             "project_id": "project",
     380            6 :             "branch_id": "branch",
     381            6 :             "cold_start_info": "unknown",
     382            6 :         })
     383            6 :     }
     384              : 
     385              :     #[test]
     386            1 :     fn parse_kick_session() -> anyhow::Result<()> {
     387            1 :         // This is what the console's kickResponse looks like.
     388            1 :         let json = json!({
     389            1 :             "session_id": "deadbeef",
     390            1 :             "result": {
     391            1 :                 "Success": {
     392            1 :                     "host": "localhost",
     393            1 :                     "port": 5432,
     394            1 :                     "dbname": "postgres",
     395            1 :                     "user": "john_doe",
     396            1 :                     "password": "password",
     397            1 :                     "aux": dummy_aux(),
     398            1 :                 }
     399            1 :             }
     400            1 :         });
     401            1 :         serde_json::from_str::<KickSession<'_>>(&json.to_string())?;
     402              : 
     403            1 :         Ok(())
     404            1 :     }
     405              : 
     406              :     #[test]
     407            1 :     fn parse_db_info() -> anyhow::Result<()> {
     408            1 :         // with password
     409            1 :         serde_json::from_value::<DatabaseInfo>(json!({
     410            1 :             "host": "localhost",
     411            1 :             "port": 5432,
     412            1 :             "dbname": "postgres",
     413            1 :             "user": "john_doe",
     414            1 :             "password": "password",
     415            1 :             "aux": dummy_aux(),
     416            1 :         }))?;
     417              : 
     418              :         // without password
     419            1 :         serde_json::from_value::<DatabaseInfo>(json!({
     420            1 :             "host": "localhost",
     421            1 :             "port": 5432,
     422            1 :             "dbname": "postgres",
     423            1 :             "user": "john_doe",
     424            1 :             "aux": dummy_aux(),
     425            1 :         }))?;
     426              : 
     427              :         // new field (forward compatibility)
     428            1 :         serde_json::from_value::<DatabaseInfo>(json!({
     429            1 :             "host": "localhost",
     430            1 :             "port": 5432,
     431            1 :             "dbname": "postgres",
     432            1 :             "user": "john_doe",
     433            1 :             "project": "hello_world",
     434            1 :             "N.E.W": "forward compatibility check",
     435            1 :             "aux": dummy_aux(),
     436            1 :         }))?;
     437              : 
     438              :         // with allowed_ips
     439            1 :         let dbinfo = serde_json::from_value::<DatabaseInfo>(json!({
     440            1 :             "host": "localhost",
     441            1 :             "port": 5432,
     442            1 :             "dbname": "postgres",
     443            1 :             "user": "john_doe",
     444            1 :             "password": "password",
     445            1 :             "aux": dummy_aux(),
     446            1 :             "allowed_ips": ["127.0.0.1"],
     447            1 :         }))?;
     448              : 
     449            1 :         assert_eq!(
     450            1 :             dbinfo.allowed_ips,
     451            1 :             Some(vec![IpPattern::Single("127.0.0.1".parse()?)])
     452              :         );
     453              : 
     454            1 :         Ok(())
     455            1 :     }
     456              : 
     457              :     #[test]
     458            1 :     fn parse_wake_compute() -> anyhow::Result<()> {
     459            1 :         let json = json!({
     460            1 :             "address": "0.0.0.0",
     461            1 :             "aux": dummy_aux(),
     462            1 :         });
     463            1 :         serde_json::from_str::<WakeCompute>(&json.to_string())?;
     464            1 :         Ok(())
     465            1 :     }
     466              : 
     467              :     #[test]
     468            1 :     fn parse_get_role_secret() -> anyhow::Result<()> {
     469            1 :         // Empty `allowed_ips` and `allowed_vpc_endpoint_ids` field.
     470            1 :         let json = json!({
     471            1 :             "role_secret": "secret",
     472            1 :         });
     473            1 :         serde_json::from_str::<GetEndpointAccessControl>(&json.to_string())?;
     474            1 :         let json = json!({
     475            1 :             "role_secret": "secret",
     476            1 :             "allowed_ips": ["8.8.8.8"],
     477            1 :         });
     478            1 :         serde_json::from_str::<GetEndpointAccessControl>(&json.to_string())?;
     479            1 :         let json = json!({
     480            1 :             "role_secret": "secret",
     481            1 :             "allowed_vpc_endpoint_ids": ["vpce-0abcd1234567890ef"],
     482            1 :         });
     483            1 :         serde_json::from_str::<GetEndpointAccessControl>(&json.to_string())?;
     484            1 :         let json = json!({
     485            1 :             "role_secret": "secret",
     486            1 :             "allowed_ips": ["8.8.8.8"],
     487            1 :             "allowed_vpc_endpoint_ids": ["vpce-0abcd1234567890ef"],
     488            1 :         });
     489            1 :         serde_json::from_str::<GetEndpointAccessControl>(&json.to_string())?;
     490            1 :         let json = json!({
     491            1 :             "role_secret": "secret",
     492            1 :             "allowed_ips": ["8.8.8.8"],
     493            1 :             "allowed_vpc_endpoint_ids": ["vpce-0abcd1234567890ef"],
     494            1 :             "project_id": "project",
     495            1 :         });
     496            1 :         serde_json::from_str::<GetEndpointAccessControl>(&json.to_string())?;
     497              : 
     498            1 :         Ok(())
     499            1 :     }
     500              : }
        

Generated by: LCOV version 2.1-beta