Line data Source code
1 : use anyhow::{anyhow, bail, Result};
2 : use reqwest::StatusCode;
3 : use std::fs::File;
4 : use std::path::Path;
5 : use tokio_postgres::Client;
6 : use tracing::{error, info, instrument, warn};
7 :
8 : use crate::config;
9 : use crate::metrics::{CPlaneRequestRPC, CPLANE_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
10 : use crate::migration::MigrationRunner;
11 : use crate::params::PG_HBA_ALL_MD5;
12 : use crate::pg_helpers::*;
13 :
14 : use compute_api::responses::{
15 : ComputeCtlConfig, ControlPlaneComputeStatus, ControlPlaneSpecResponse,
16 : };
17 : use compute_api::spec::ComputeSpec;
18 :
19 : // Do control plane request and return response if any. In case of error it
20 : // returns a bool flag indicating whether it makes sense to retry the request
21 : // and a string with error message.
22 0 : fn do_control_plane_request(
23 0 : uri: &str,
24 0 : jwt: &str,
25 0 : ) -> Result<ControlPlaneSpecResponse, (bool, String, String)> {
26 0 : let resp = reqwest::blocking::Client::new()
27 0 : .get(uri)
28 0 : .header("Authorization", format!("Bearer {}", jwt))
29 0 : .send()
30 0 : .map_err(|e| {
31 0 : (
32 0 : true,
33 0 : format!("could not perform spec request to control plane: {:?}", e),
34 0 : UNKNOWN_HTTP_STATUS.to_string(),
35 0 : )
36 0 : })?;
37 :
38 0 : let status = resp.status();
39 0 : match status {
40 0 : StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
41 0 : Ok(spec_resp) => Ok(spec_resp),
42 0 : Err(e) => Err((
43 0 : true,
44 0 : format!("could not deserialize control plane response: {:?}", e),
45 0 : status.to_string(),
46 0 : )),
47 : },
48 0 : StatusCode::SERVICE_UNAVAILABLE => Err((
49 0 : true,
50 0 : "control plane is temporarily unavailable".to_string(),
51 0 : status.to_string(),
52 0 : )),
53 : StatusCode::BAD_GATEWAY => {
54 : // We have a problem with intermittent 502 errors now
55 : // https://github.com/neondatabase/cloud/issues/2353
56 : // It's fine to retry GET request in this case.
57 0 : Err((
58 0 : true,
59 0 : "control plane request failed with 502".to_string(),
60 0 : status.to_string(),
61 0 : ))
62 : }
63 : // Another code, likely 500 or 404, means that compute is unknown to the control plane
64 : // or some internal failure happened. Doesn't make much sense to retry in this case.
65 0 : _ => Err((
66 0 : false,
67 0 : format!("unexpected control plane response status code: {}", status),
68 0 : status.to_string(),
69 0 : )),
70 : }
71 0 : }
72 :
73 : /// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
74 : /// env variable is set, it will be used for authorization.
75 0 : pub fn get_spec_from_control_plane(
76 0 : base_uri: &str,
77 0 : compute_id: &str,
78 0 : ) -> Result<(Option<ComputeSpec>, ComputeCtlConfig)> {
79 0 : let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
80 0 : let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
81 0 : Ok(v) => v,
82 0 : Err(_) => "".to_string(),
83 : };
84 0 : let mut attempt = 1;
85 0 :
86 0 : info!("getting spec from control plane: {}", cp_uri);
87 :
88 : // Do 3 attempts to get spec from the control plane using the following logic:
89 : // - network error -> then retry
90 : // - compute id is unknown or any other error -> bail out
91 : // - no spec for compute yet (Empty state) -> return Ok(None)
92 : // - got spec -> return Ok(Some(spec))
93 0 : while attempt < 4 {
94 0 : let result = match do_control_plane_request(&cp_uri, &jwt) {
95 0 : Ok(spec_resp) => {
96 0 : CPLANE_REQUESTS_TOTAL
97 0 : .with_label_values(&[
98 0 : CPlaneRequestRPC::GetSpec.as_str(),
99 0 : &StatusCode::OK.to_string(),
100 0 : ])
101 0 : .inc();
102 0 : match spec_resp.status {
103 0 : ControlPlaneComputeStatus::Empty => Ok((None, spec_resp.compute_ctl_config)),
104 : ControlPlaneComputeStatus::Attached => {
105 0 : if let Some(spec) = spec_resp.spec {
106 0 : Ok((Some(spec), spec_resp.compute_ctl_config))
107 : } else {
108 0 : bail!("compute is attached, but spec is empty")
109 : }
110 : }
111 : }
112 : }
113 0 : Err((retry, msg, status)) => {
114 0 : CPLANE_REQUESTS_TOTAL
115 0 : .with_label_values(&[CPlaneRequestRPC::GetSpec.as_str(), &status])
116 0 : .inc();
117 0 : if retry {
118 0 : Err(anyhow!(msg))
119 : } else {
120 0 : bail!(msg);
121 : }
122 : }
123 : };
124 :
125 0 : if let Err(e) = &result {
126 0 : error!("attempt {} to get spec failed with: {}", attempt, e);
127 : } else {
128 0 : return result;
129 : }
130 :
131 0 : attempt += 1;
132 0 : std::thread::sleep(std::time::Duration::from_millis(100));
133 : }
134 :
135 : // All attempts failed, return error.
136 0 : Err(anyhow::anyhow!(
137 0 : "Exhausted all attempts to retrieve the spec from the control plane"
138 0 : ))
139 0 : }
140 :
141 : /// Check `pg_hba.conf` and update if needed to allow external connections.
142 0 : pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
143 0 : // XXX: consider making it a part of spec.json
144 0 : info!("checking pg_hba.conf");
145 0 : let pghba_path = pgdata_path.join("pg_hba.conf");
146 0 :
147 0 : if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
148 0 : info!("updated pg_hba.conf to allow external connections");
149 : } else {
150 0 : info!("pg_hba.conf is up-to-date");
151 : }
152 :
153 0 : Ok(())
154 0 : }
155 :
156 : /// Create a standby.signal file
157 0 : pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
158 0 : // XXX: consider making it a part of spec.json
159 0 : info!("adding standby.signal");
160 0 : let signalfile = pgdata_path.join("standby.signal");
161 0 :
162 0 : if !signalfile.exists() {
163 0 : info!("created standby.signal");
164 0 : File::create(signalfile)?;
165 : } else {
166 0 : info!("reused pre-existing standby.signal");
167 : }
168 0 : Ok(())
169 0 : }
170 :
171 : #[instrument(skip_all)]
172 : pub async fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
173 : info!("handle neon extension upgrade");
174 : let query = "ALTER EXTENSION neon UPDATE";
175 : info!("update neon extension version with query: {}", query);
176 : client.simple_query(query).await?;
177 :
178 : Ok(())
179 : }
180 :
181 : #[instrument(skip_all)]
182 : pub async fn handle_migrations(client: &mut Client) -> Result<()> {
183 : info!("handle migrations");
184 :
185 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
186 : // !BE SURE TO ONLY ADD MIGRATIONS TO THE END OF THIS ARRAY. IF YOU DO NOT, VERY VERY BAD THINGS MAY HAPPEN!
187 : // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
188 :
189 : // Add new migrations in numerical order.
190 : let migrations = [
191 : include_str!("./migrations/0001-neon_superuser_bypass_rls.sql"),
192 : include_str!("./migrations/0002-alter_roles.sql"),
193 : include_str!("./migrations/0003-grant_pg_create_subscription_to_neon_superuser.sql"),
194 : include_str!("./migrations/0004-grant_pg_monitor_to_neon_superuser.sql"),
195 : include_str!("./migrations/0005-grant_all_on_tables_to_neon_superuser.sql"),
196 : include_str!("./migrations/0006-grant_all_on_sequences_to_neon_superuser.sql"),
197 : include_str!(
198 : "./migrations/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql"
199 : ),
200 : include_str!(
201 : "./migrations/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql"
202 : ),
203 : include_str!("./migrations/0009-revoke_replication_for_previously_allowed_roles.sql"),
204 : include_str!(
205 : "./migrations/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql"
206 : ),
207 : include_str!(
208 : "./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
209 : ),
210 : ];
211 :
212 : MigrationRunner::new(client, &migrations)
213 : .run_migrations()
214 : .await?;
215 :
216 : Ok(())
217 : }
218 :
219 : /// Connect to the database as superuser and pre-create anon extension
220 : /// if it is present in shared_preload_libraries
221 : #[instrument(skip_all)]
222 : pub async fn handle_extension_anon(
223 : spec: &ComputeSpec,
224 : db_owner: &str,
225 : db_client: &mut Client,
226 : grants_only: bool,
227 : ) -> Result<()> {
228 : info!("handle extension anon");
229 :
230 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
231 : if libs.contains("anon") {
232 : if !grants_only {
233 : // check if extension is already initialized using anon.is_initialized()
234 : let query = "SELECT anon.is_initialized()";
235 : match db_client.query(query, &[]).await {
236 : Ok(rows) => {
237 : if !rows.is_empty() {
238 : let is_initialized: bool = rows[0].get(0);
239 : if is_initialized {
240 : info!("anon extension is already initialized");
241 : return Ok(());
242 : }
243 : }
244 : }
245 : Err(e) => {
246 : warn!(
247 : "anon extension is_installed check failed with expected error: {}",
248 : e
249 : );
250 : }
251 : };
252 :
253 : // Create anon extension if this compute needs it
254 : // Users cannot create it themselves, because superuser is required.
255 : let mut query = "CREATE EXTENSION IF NOT EXISTS anon CASCADE";
256 : info!("creating anon extension with query: {}", query);
257 : match db_client.query(query, &[]).await {
258 : Ok(_) => {}
259 : Err(e) => {
260 : error!("anon extension creation failed with error: {}", e);
261 : return Ok(());
262 : }
263 : }
264 :
265 : // check that extension is installed
266 : query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
267 : let rows = db_client.query(query, &[]).await?;
268 : if rows.is_empty() {
269 : error!("anon extension is not installed");
270 : return Ok(());
271 : }
272 :
273 : // Initialize anon extension
274 : // This also requires superuser privileges, so users cannot do it themselves.
275 : query = "SELECT anon.init()";
276 : match db_client.query(query, &[]).await {
277 : Ok(_) => {}
278 : Err(e) => {
279 : error!("anon.init() failed with error: {}", e);
280 : return Ok(());
281 : }
282 : }
283 : }
284 :
285 : // check that extension is installed, if not bail early
286 : let query = "SELECT extname FROM pg_extension WHERE extname = 'anon'";
287 : match db_client.query(query, &[]).await {
288 : Ok(rows) => {
289 : if rows.is_empty() {
290 : error!("anon extension is not installed");
291 : return Ok(());
292 : }
293 : }
294 : Err(e) => {
295 : error!("anon extension check failed with error: {}", e);
296 : return Ok(());
297 : }
298 : };
299 :
300 : let query = format!("GRANT ALL ON SCHEMA anon TO {}", db_owner);
301 : info!("granting anon extension permissions with query: {}", query);
302 : db_client.simple_query(&query).await?;
303 :
304 : // Grant permissions to db_owner to use anon extension functions
305 : let query = format!("GRANT ALL ON ALL FUNCTIONS IN SCHEMA anon TO {}", db_owner);
306 : info!("granting anon extension permissions with query: {}", query);
307 : db_client.simple_query(&query).await?;
308 :
309 : // This is needed, because some functions are defined as SECURITY DEFINER.
310 : // In Postgres SECURITY DEFINER functions are executed with the privileges
311 : // of the owner.
312 : // In anon extension this it is needed to access some GUCs, which are only accessible to
313 : // superuser. But we've patched postgres to allow db_owner to access them as well.
314 : // So we need to change owner of these functions to db_owner.
315 : let query = format!("
316 : SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {};'
317 : from pg_proc p
318 : join pg_namespace nsp ON p.pronamespace = nsp.oid
319 : where nsp.nspname = 'anon';", db_owner);
320 :
321 : info!("change anon extension functions owner to db owner");
322 : db_client.simple_query(&query).await?;
323 :
324 : // affects views as well
325 : let query = format!("GRANT ALL ON ALL TABLES IN SCHEMA anon TO {}", db_owner);
326 : info!("granting anon extension permissions with query: {}", query);
327 : db_client.simple_query(&query).await?;
328 :
329 : let query = format!("GRANT ALL ON ALL SEQUENCES IN SCHEMA anon TO {}", db_owner);
330 : info!("granting anon extension permissions with query: {}", query);
331 : db_client.simple_query(&query).await?;
332 : }
333 : }
334 :
335 : Ok(())
336 : }
|