Line data Source code
1 : use std::fmt;
2 :
3 : use async_trait::async_trait;
4 : use postgres_client::config::SslMode;
5 : use pq_proto::BeMessage as Be;
6 : use thiserror::Error;
7 : use tokio::io::{AsyncRead, AsyncWrite};
8 : use tracing::{info, info_span};
9 :
10 : use super::{ComputeCredentialKeys, ControlPlaneApi};
11 : use crate::auth::backend::{BackendIpAllowlist, ComputeUserInfo};
12 : use crate::auth::IpPattern;
13 : use crate::cache::Cached;
14 : use crate::config::AuthenticationConfig;
15 : use crate::context::RequestContext;
16 : use crate::control_plane::client::cplane_proxy_v1;
17 : use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
18 : use crate::error::{ReportableError, UserFacingError};
19 : use crate::proxy::connect_compute::ComputeConnectBackend;
20 : use crate::proxy::NeonOptions;
21 : use crate::stream::PqStream;
22 : use crate::types::RoleName;
23 : use crate::{auth, compute, waiters};
24 :
25 : #[derive(Debug, Error)]
26 : pub(crate) enum ConsoleRedirectError {
27 : #[error(transparent)]
28 : WaiterRegister(#[from] waiters::RegisterError),
29 :
30 : #[error(transparent)]
31 : WaiterWait(#[from] waiters::WaitError),
32 :
33 : #[error(transparent)]
34 : Io(#[from] std::io::Error),
35 : }
36 :
37 : #[derive(Debug)]
38 : pub struct ConsoleRedirectBackend {
39 : console_uri: reqwest::Url,
40 : api: cplane_proxy_v1::NeonControlPlaneClient,
41 : }
42 :
43 : impl fmt::Debug for cplane_proxy_v1::NeonControlPlaneClient {
44 0 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 0 : write!(f, "NeonControlPlaneClient")
46 0 : }
47 : }
48 :
49 : impl UserFacingError for ConsoleRedirectError {
50 0 : fn to_string_client(&self) -> String {
51 0 : "Internal error".to_string()
52 0 : }
53 : }
54 :
55 : impl ReportableError for ConsoleRedirectError {
56 0 : fn get_error_kind(&self) -> crate::error::ErrorKind {
57 0 : match self {
58 0 : Self::WaiterRegister(_) => crate::error::ErrorKind::Service,
59 0 : Self::WaiterWait(_) => crate::error::ErrorKind::Service,
60 0 : Self::Io(_) => crate::error::ErrorKind::ClientDisconnect,
61 : }
62 0 : }
63 : }
64 :
65 0 : fn hello_message(
66 0 : redirect_uri: &reqwest::Url,
67 0 : session_id: &str,
68 0 : duration: std::time::Duration,
69 0 : ) -> String {
70 0 : let formatted_duration = humantime::format_duration(duration).to_string();
71 0 : format!(
72 0 : concat![
73 0 : "Welcome to Neon!\n",
74 0 : "Authenticate by visiting (will expire in {duration}):\n",
75 0 : " {redirect_uri}{session_id}\n\n",
76 0 : ],
77 0 : duration = formatted_duration,
78 0 : redirect_uri = redirect_uri,
79 0 : session_id = session_id,
80 0 : )
81 0 : }
82 :
83 0 : pub(crate) fn new_psql_session_id() -> String {
84 0 : hex::encode(rand::random::<[u8; 8]>())
85 0 : }
86 :
87 : #[async_trait]
88 : impl BackendIpAllowlist for ConsoleRedirectBackend {
89 0 : async fn get_allowed_ips(
90 0 : &self,
91 0 : ctx: &RequestContext,
92 0 : user_info: &ComputeUserInfo,
93 0 : ) -> auth::Result<Vec<auth::IpPattern>> {
94 0 : self.api
95 0 : .get_allowed_ips_and_secret(ctx, user_info)
96 0 : .await
97 0 : .map(|(ips, _)| ips.as_ref().clone())
98 0 : .map_err(|e| e.into())
99 0 : }
100 : }
101 :
102 : impl ConsoleRedirectBackend {
103 0 : pub fn new(console_uri: reqwest::Url, api: cplane_proxy_v1::NeonControlPlaneClient) -> Self {
104 0 : Self { console_uri, api }
105 0 : }
106 :
107 0 : pub(crate) async fn authenticate(
108 0 : &self,
109 0 : ctx: &RequestContext,
110 0 : auth_config: &'static AuthenticationConfig,
111 0 : client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
112 0 : ) -> auth::Result<(
113 0 : ConsoleRedirectNodeInfo,
114 0 : ComputeUserInfo,
115 0 : Option<Vec<IpPattern>>,
116 0 : )> {
117 0 : authenticate(ctx, auth_config, &self.console_uri, client)
118 0 : .await
119 0 : .map(|(node_info, user_info, ip_allowlist)| {
120 0 : (ConsoleRedirectNodeInfo(node_info), user_info, ip_allowlist)
121 0 : })
122 0 : }
123 : }
124 :
125 : pub struct ConsoleRedirectNodeInfo(pub(super) NodeInfo);
126 :
127 : #[async_trait]
128 : impl ComputeConnectBackend for ConsoleRedirectNodeInfo {
129 0 : async fn wake_compute(
130 0 : &self,
131 0 : _ctx: &RequestContext,
132 0 : ) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
133 0 : Ok(Cached::new_uncached(self.0.clone()))
134 0 : }
135 :
136 0 : fn get_keys(&self) -> &ComputeCredentialKeys {
137 0 : &ComputeCredentialKeys::None
138 0 : }
139 : }
140 :
141 0 : async fn authenticate(
142 0 : ctx: &RequestContext,
143 0 : auth_config: &'static AuthenticationConfig,
144 0 : link_uri: &reqwest::Url,
145 0 : client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
146 0 : ) -> auth::Result<(NodeInfo, ComputeUserInfo, Option<Vec<IpPattern>>)> {
147 0 : ctx.set_auth_method(crate::context::AuthMethod::ConsoleRedirect);
148 :
149 : // registering waiter can fail if we get unlucky with rng.
150 : // just try again.
151 0 : let (psql_session_id, waiter) = loop {
152 0 : let psql_session_id = new_psql_session_id();
153 0 :
154 0 : match control_plane::mgmt::get_waiter(&psql_session_id) {
155 0 : Ok(waiter) => break (psql_session_id, waiter),
156 0 : Err(_e) => continue,
157 : }
158 : };
159 :
160 0 : let span = info_span!("console_redirect", psql_session_id = &psql_session_id);
161 0 : let greeting = hello_message(
162 0 : link_uri,
163 0 : &psql_session_id,
164 0 : auth_config.console_redirect_confirmation_timeout,
165 0 : );
166 0 :
167 0 : // Give user a URL to spawn a new database.
168 0 : info!(parent: &span, "sending the auth URL to the user");
169 0 : client
170 0 : .write_message_noflush(&Be::AuthenticationOk)?
171 0 : .write_message_noflush(&Be::CLIENT_ENCODING)?
172 0 : .write_message(&Be::NoticeResponse(&greeting))
173 0 : .await?;
174 :
175 : // Wait for console response via control plane (see `mgmt`).
176 0 : info!(parent: &span, "waiting for console's reply...");
177 0 : let db_info = tokio::time::timeout(auth_config.console_redirect_confirmation_timeout, waiter)
178 0 : .await
179 0 : .map_err(|_elapsed| {
180 0 : auth::AuthError::confirmation_timeout(
181 0 : auth_config.console_redirect_confirmation_timeout.into(),
182 0 : )
183 0 : })?
184 0 : .map_err(ConsoleRedirectError::from)?;
185 :
186 0 : if auth_config.ip_allowlist_check_enabled {
187 0 : if let Some(allowed_ips) = &db_info.allowed_ips {
188 0 : if !auth::check_peer_addr_is_in_list(&ctx.peer_addr(), allowed_ips) {
189 0 : return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr()));
190 0 : }
191 0 : }
192 0 : }
193 :
194 0 : client.write_message_noflush(&Be::NoticeResponse("Connecting to database."))?;
195 :
196 : // This config should be self-contained, because we won't
197 : // take username or dbname from client's startup message.
198 0 : let mut config = compute::ConnCfg::new(db_info.host.to_string(), db_info.port);
199 0 : config.dbname(&db_info.dbname).user(&db_info.user);
200 0 :
201 0 : let user: RoleName = db_info.user.into();
202 0 : let user_info = ComputeUserInfo {
203 0 : endpoint: db_info.aux.endpoint_id.as_str().into(),
204 0 : user: user.clone(),
205 0 : options: NeonOptions::default(),
206 0 : };
207 0 :
208 0 : ctx.set_dbname(db_info.dbname.into());
209 0 : ctx.set_user(user);
210 0 : ctx.set_project(db_info.aux.clone());
211 0 : info!("woken up a compute node");
212 :
213 : // Backwards compatibility. pg_sni_proxy uses "--" in domain names
214 : // while direct connections do not. Once we migrate to pg_sni_proxy
215 : // everywhere, we can remove this.
216 0 : if db_info.host.contains("--") {
217 0 : // we need TLS connection with SNI info to properly route it
218 0 : config.ssl_mode(SslMode::Require);
219 0 : } else {
220 0 : config.ssl_mode(SslMode::Disable);
221 0 : }
222 :
223 0 : if let Some(password) = db_info.password {
224 0 : config.password(password.as_ref());
225 0 : }
226 :
227 0 : Ok((
228 0 : NodeInfo {
229 0 : config,
230 0 : aux: db_info.aux,
231 0 : },
232 0 : user_info,
233 0 : db_info.allowed_ips,
234 0 : ))
235 0 : }
|