Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 : use std::env;
3 : use std::fs;
4 : use std::iter::once;
5 : use std::os::unix::fs::{symlink, PermissionsExt};
6 : use std::path::Path;
7 : use std::process::{Command, Stdio};
8 : use std::str::FromStr;
9 : use std::sync::atomic::AtomicU32;
10 : use std::sync::atomic::Ordering;
11 : use std::sync::{Arc, Condvar, Mutex, RwLock};
12 : use std::thread;
13 : use std::time::Duration;
14 : use std::time::Instant;
15 :
16 : use anyhow::{Context, Result};
17 : use chrono::{DateTime, Utc};
18 : use compute_api::spec::{Database, PgIdent, Role};
19 : use futures::future::join_all;
20 : use futures::stream::FuturesUnordered;
21 : use futures::StreamExt;
22 : use nix::unistd::Pid;
23 : use postgres;
24 : use postgres::error::SqlState;
25 : use postgres::NoTls;
26 : use tracing::{debug, error, info, instrument, warn};
27 : use utils::id::{TenantId, TimelineId};
28 : use utils::lsn::Lsn;
29 :
30 : use compute_api::privilege::Privilege;
31 : use compute_api::responses::{ComputeMetrics, ComputeStatus};
32 : use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec, ExtVersion};
33 : use utils::measured_stream::MeasuredReader;
34 :
35 : use nix::sys::signal::{kill, Signal};
36 : use remote_storage::{DownloadError, RemotePath};
37 : use tokio::spawn;
38 :
39 : use crate::installed_extensions::get_installed_extensions;
40 : use crate::local_proxy;
41 : use crate::pg_helpers::*;
42 : use crate::spec::*;
43 : use crate::spec_apply::ApplySpecPhase::{
44 : CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSchemaNeon,
45 : CreateSuperUser, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
46 : HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
47 : RunInEachDatabase,
48 : };
49 : use crate::spec_apply::PerDatabasePhase;
50 : use crate::spec_apply::PerDatabasePhase::{
51 : ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
52 : };
53 : use crate::spec_apply::{apply_operations, MutableApplyContext, DB};
54 : use crate::sync_sk::{check_if_synced, ping_safekeeper};
55 : use crate::{config, extension_server};
56 :
57 : pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
58 : pub static PG_PID: AtomicU32 = AtomicU32::new(0);
59 :
60 : /// Compute node info shared across several `compute_ctl` threads.
61 : pub struct ComputeNode {
62 : // Url type maintains proper escaping
63 : pub connstr: url::Url,
64 : // We connect to Postgres from many different places, so build configs once
65 : // and reuse them where needed.
66 : pub conn_conf: postgres::config::Config,
67 : pub tokio_conn_conf: tokio_postgres::config::Config,
68 : pub pgdata: String,
69 : pub pgbin: String,
70 : pub pgversion: String,
71 : /// We should only allow live re- / configuration of the compute node if
72 : /// it uses 'pull model', i.e. it can go to control-plane and fetch
73 : /// the latest configuration. Otherwise, there could be a case:
74 : /// - we start compute with some spec provided as argument
75 : /// - we push new spec and it does reconfiguration
76 : /// - but then something happens and compute pod / VM is destroyed,
77 : /// so k8s controller starts it again with the **old** spec
78 : ///
79 : /// and the same for empty computes:
80 : /// - we started compute without any spec
81 : /// - we push spec and it does configuration
82 : /// - but then it is restarted without any spec again
83 : pub live_config_allowed: bool,
84 : /// The port that the compute's HTTP server listens on
85 : pub http_port: u16,
86 : /// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
87 : /// To allow HTTP API server to serving status requests, while configuration
88 : /// is in progress, lock should be held only for short periods of time to do
89 : /// read/write, not the whole configuration process.
90 : pub state: Mutex<ComputeState>,
91 : /// `Condvar` to allow notifying waiters about state changes.
92 : pub state_changed: Condvar,
93 : /// the address of extension storage proxy gateway
94 : pub ext_remote_storage: Option<String>,
95 : // key: ext_archive_name, value: started download time, download_completed?
96 : pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
97 : pub build_tag: String,
98 : }
99 :
100 : // store some metrics about download size that might impact startup time
101 : #[derive(Clone, Debug)]
102 : pub struct RemoteExtensionMetrics {
103 : num_ext_downloaded: u64,
104 : largest_ext_size: u64,
105 : total_ext_download_size: u64,
106 : }
107 :
108 : #[derive(Clone, Debug)]
109 : pub struct ComputeState {
110 : pub start_time: DateTime<Utc>,
111 : pub status: ComputeStatus,
112 : /// Timestamp of the last Postgres activity. It could be `None` if
113 : /// compute wasn't used since start.
114 : pub last_active: Option<DateTime<Utc>>,
115 : pub error: Option<String>,
116 : pub pspec: Option<ParsedSpec>,
117 : pub metrics: ComputeMetrics,
118 : }
119 :
120 : impl ComputeState {
121 0 : pub fn new() -> Self {
122 0 : Self {
123 0 : start_time: Utc::now(),
124 0 : status: ComputeStatus::Empty,
125 0 : last_active: None,
126 0 : error: None,
127 0 : pspec: None,
128 0 : metrics: ComputeMetrics::default(),
129 0 : }
130 0 : }
131 :
132 0 : pub fn set_status(&mut self, status: ComputeStatus, state_changed: &Condvar) {
133 0 : let prev = self.status;
134 0 : info!("Changing compute status from {} to {}", prev, status);
135 0 : self.status = status;
136 0 : state_changed.notify_all();
137 0 : }
138 :
139 0 : pub fn set_failed_status(&mut self, err: anyhow::Error, state_changed: &Condvar) {
140 0 : self.error = Some(format!("{err:?}"));
141 0 : self.set_status(ComputeStatus::Failed, state_changed);
142 0 : }
143 : }
144 :
145 : impl Default for ComputeState {
146 0 : fn default() -> Self {
147 0 : Self::new()
148 0 : }
149 : }
150 :
151 : #[derive(Clone, Debug)]
152 : pub struct ParsedSpec {
153 : pub spec: ComputeSpec,
154 : pub tenant_id: TenantId,
155 : pub timeline_id: TimelineId,
156 : pub pageserver_connstr: String,
157 : pub safekeeper_connstrings: Vec<String>,
158 : pub storage_auth_token: Option<String>,
159 : }
160 :
161 : impl TryFrom<ComputeSpec> for ParsedSpec {
162 : type Error = String;
163 0 : fn try_from(spec: ComputeSpec) -> Result<Self, String> {
164 : // Extract the options from the spec file that are needed to connect to
165 : // the storage system.
166 : //
167 : // For backwards-compatibility, the top-level fields in the spec file
168 : // may be empty. In that case, we need to dig them from the GUCs in the
169 : // cluster.settings field.
170 0 : let pageserver_connstr = spec
171 0 : .pageserver_connstring
172 0 : .clone()
173 0 : .or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
174 0 : .ok_or("pageserver connstr should be provided")?;
175 0 : let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
176 0 : if matches!(spec.mode, ComputeMode::Primary) {
177 0 : spec.cluster
178 0 : .settings
179 0 : .find("neon.safekeepers")
180 0 : .ok_or("safekeeper connstrings should be provided")?
181 0 : .split(',')
182 0 : .map(|str| str.to_string())
183 0 : .collect()
184 : } else {
185 0 : vec![]
186 : }
187 : } else {
188 0 : spec.safekeeper_connstrings.clone()
189 : };
190 0 : let storage_auth_token = spec.storage_auth_token.clone();
191 0 : let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
192 0 : tenant_id
193 : } else {
194 0 : spec.cluster
195 0 : .settings
196 0 : .find("neon.tenant_id")
197 0 : .ok_or("tenant id should be provided")
198 0 : .map(|s| TenantId::from_str(&s))?
199 0 : .or(Err("invalid tenant id"))?
200 : };
201 0 : let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
202 0 : timeline_id
203 : } else {
204 0 : spec.cluster
205 0 : .settings
206 0 : .find("neon.timeline_id")
207 0 : .ok_or("timeline id should be provided")
208 0 : .map(|s| TimelineId::from_str(&s))?
209 0 : .or(Err("invalid timeline id"))?
210 : };
211 :
212 0 : Ok(ParsedSpec {
213 0 : spec,
214 0 : pageserver_connstr,
215 0 : safekeeper_connstrings,
216 0 : storage_auth_token,
217 0 : tenant_id,
218 0 : timeline_id,
219 0 : })
220 0 : }
221 : }
222 :
223 : /// If we are a VM, returns a [`Command`] that will run in the `neon-postgres`
224 : /// cgroup. Otherwise returns the default `Command::new(cmd)`
225 : ///
226 : /// This function should be used to start postgres, as it will start it in the
227 : /// neon-postgres cgroup if we are a VM. This allows autoscaling to control
228 : /// postgres' resource usage. The cgroup will exist in VMs because vm-builder
229 : /// creates it during the sysinit phase of its inittab.
230 0 : fn maybe_cgexec(cmd: &str) -> Command {
231 0 : // The cplane sets this env var for autoscaling computes.
232 0 : // use `var_os` so we don't have to worry about the variable being valid
233 0 : // unicode. Should never be an concern . . . but just in case
234 0 : if env::var_os("AUTOSCALING").is_some() {
235 0 : let mut command = Command::new("cgexec");
236 0 : command.args(["-g", "memory:neon-postgres"]);
237 0 : command.arg(cmd);
238 0 : command
239 : } else {
240 0 : Command::new(cmd)
241 : }
242 0 : }
243 :
244 0 : pub(crate) fn construct_superuser_query(spec: &ComputeSpec) -> String {
245 0 : let roles = spec
246 0 : .cluster
247 0 : .roles
248 0 : .iter()
249 0 : .map(|r| escape_literal(&r.name))
250 0 : .collect::<Vec<_>>();
251 0 :
252 0 : let dbs = spec
253 0 : .cluster
254 0 : .databases
255 0 : .iter()
256 0 : .map(|db| escape_literal(&db.name))
257 0 : .collect::<Vec<_>>();
258 :
259 0 : let roles_decl = if roles.is_empty() {
260 0 : String::from("roles text[] := NULL;")
261 : } else {
262 0 : format!(
263 0 : r#"
264 0 : roles text[] := ARRAY(SELECT rolname
265 0 : FROM pg_catalog.pg_roles
266 0 : WHERE rolname IN ({}));"#,
267 0 : roles.join(", ")
268 0 : )
269 : };
270 :
271 0 : let database_decl = if dbs.is_empty() {
272 0 : String::from("dbs text[] := NULL;")
273 : } else {
274 0 : format!(
275 0 : r#"
276 0 : dbs text[] := ARRAY(SELECT datname
277 0 : FROM pg_catalog.pg_database
278 0 : WHERE datname IN ({}));"#,
279 0 : dbs.join(", ")
280 0 : )
281 : };
282 :
283 : // ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases
284 : // (see https://www.postgresql.org/docs/current/ddl-priv.html)
285 0 : let query = format!(
286 0 : r#"
287 0 : DO $$
288 0 : DECLARE
289 0 : r text;
290 0 : {}
291 0 : {}
292 0 : BEGIN
293 0 : IF NOT EXISTS (
294 0 : SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
295 0 : THEN
296 0 : CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
297 0 : IF array_length(roles, 1) IS NOT NULL THEN
298 0 : EXECUTE format('GRANT neon_superuser TO %s',
299 0 : array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
300 0 : FOREACH r IN ARRAY roles LOOP
301 0 : EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r));
302 0 : END LOOP;
303 0 : END IF;
304 0 : IF array_length(dbs, 1) IS NOT NULL THEN
305 0 : EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser',
306 0 : array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', '));
307 0 : END IF;
308 0 : END IF;
309 0 : END
310 0 : $$;"#,
311 0 : roles_decl, database_decl,
312 0 : );
313 0 :
314 0 : query
315 0 : }
316 :
317 : impl ComputeNode {
318 : /// Check that compute node has corresponding feature enabled.
319 0 : pub fn has_feature(&self, feature: ComputeFeature) -> bool {
320 0 : let state = self.state.lock().unwrap();
321 :
322 0 : if let Some(s) = state.pspec.as_ref() {
323 0 : s.spec.features.contains(&feature)
324 : } else {
325 0 : false
326 : }
327 0 : }
328 :
329 0 : pub fn set_status(&self, status: ComputeStatus) {
330 0 : let mut state = self.state.lock().unwrap();
331 0 : state.set_status(status, &self.state_changed);
332 0 : }
333 :
334 0 : pub fn set_failed_status(&self, err: anyhow::Error) {
335 0 : let mut state = self.state.lock().unwrap();
336 0 : state.set_failed_status(err, &self.state_changed);
337 0 : }
338 :
339 0 : pub fn get_status(&self) -> ComputeStatus {
340 0 : self.state.lock().unwrap().status
341 0 : }
342 :
343 0 : pub fn get_timeline_id(&self) -> Option<TimelineId> {
344 0 : self.state
345 0 : .lock()
346 0 : .unwrap()
347 0 : .pspec
348 0 : .as_ref()
349 0 : .map(|s| s.timeline_id)
350 0 : }
351 :
352 : // Remove `pgdata` directory and create it again with right permissions.
353 0 : fn create_pgdata(&self) -> Result<()> {
354 0 : // Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
355 0 : // If it is something different then create_dir() will error out anyway.
356 0 : let _ok = fs::remove_dir_all(&self.pgdata);
357 0 : fs::create_dir(&self.pgdata)?;
358 0 : fs::set_permissions(&self.pgdata, fs::Permissions::from_mode(0o700))?;
359 :
360 0 : Ok(())
361 0 : }
362 :
363 : // Get basebackup from the libpq connection to pageserver using `connstr` and
364 : // unarchive it to `pgdata` directory overriding all its previous content.
365 : #[instrument(skip_all, fields(%lsn))]
366 : fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
367 : let spec = compute_state.pspec.as_ref().expect("spec must be set");
368 : let start_time = Instant::now();
369 :
370 : let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
371 : let mut config = postgres::Config::from_str(shard0_connstr)?;
372 :
373 : // Use the storage auth token from the config file, if given.
374 : // Note: this overrides any password set in the connection string.
375 : if let Some(storage_auth_token) = &spec.storage_auth_token {
376 : info!("Got storage auth token from spec file");
377 : config.password(storage_auth_token);
378 : } else {
379 : info!("Storage auth token not set");
380 : }
381 :
382 : // Connect to pageserver
383 : let mut client = config.connect(NoTls)?;
384 : let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
385 :
386 : let basebackup_cmd = match lsn {
387 : Lsn(0) => {
388 : if spec.spec.mode != ComputeMode::Primary {
389 : format!(
390 : "basebackup {} {} --gzip --replica",
391 : spec.tenant_id, spec.timeline_id
392 : )
393 : } else {
394 : format!("basebackup {} {} --gzip", spec.tenant_id, spec.timeline_id)
395 : }
396 : }
397 : _ => {
398 : if spec.spec.mode != ComputeMode::Primary {
399 : format!(
400 : "basebackup {} {} {} --gzip --replica",
401 : spec.tenant_id, spec.timeline_id, lsn
402 : )
403 : } else {
404 : format!(
405 : "basebackup {} {} {} --gzip",
406 : spec.tenant_id, spec.timeline_id, lsn
407 : )
408 : }
409 : }
410 : };
411 :
412 : let copyreader = client.copy_out(basebackup_cmd.as_str())?;
413 : let mut measured_reader = MeasuredReader::new(copyreader);
414 : let mut bufreader = std::io::BufReader::new(&mut measured_reader);
415 :
416 : // Read the archive directly from the `CopyOutReader`
417 : //
418 : // Set `ignore_zeros` so that unpack() reads all the Copy data and
419 : // doesn't stop at the end-of-archive marker. Otherwise, if the server
420 : // sends an Error after finishing the tarball, we will not notice it.
421 : let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut bufreader));
422 : ar.set_ignore_zeros(true);
423 : ar.unpack(&self.pgdata)?;
424 :
425 : // Report metrics
426 : let mut state = self.state.lock().unwrap();
427 : state.metrics.pageserver_connect_micros = pageserver_connect_micros;
428 : state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
429 : state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
430 : Ok(())
431 : }
432 :
433 : // Gets the basebackup in a retry loop
434 : #[instrument(skip_all, fields(%lsn))]
435 : pub fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
436 : let mut retry_period_ms = 500.0;
437 : let mut attempts = 0;
438 : const DEFAULT_ATTEMPTS: u16 = 10;
439 : #[cfg(feature = "testing")]
440 : let max_attempts = if let Ok(v) = env::var("NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES") {
441 : u16::from_str(&v).unwrap()
442 : } else {
443 : DEFAULT_ATTEMPTS
444 : };
445 : #[cfg(not(feature = "testing"))]
446 : let max_attempts = DEFAULT_ATTEMPTS;
447 : loop {
448 : let result = self.try_get_basebackup(compute_state, lsn);
449 : match result {
450 : Ok(_) => {
451 : return result;
452 : }
453 : Err(ref e) if attempts < max_attempts => {
454 : warn!(
455 : "Failed to get basebackup: {} (attempt {}/{})",
456 : e, attempts, max_attempts
457 : );
458 : std::thread::sleep(std::time::Duration::from_millis(retry_period_ms as u64));
459 : retry_period_ms *= 1.5;
460 : }
461 : Err(_) => {
462 : return result;
463 : }
464 : }
465 : attempts += 1;
466 : }
467 : }
468 :
469 0 : pub async fn check_safekeepers_synced_async(
470 0 : &self,
471 0 : compute_state: &ComputeState,
472 0 : ) -> Result<Option<Lsn>> {
473 0 : // Construct a connection config for each safekeeper
474 0 : let pspec: ParsedSpec = compute_state
475 0 : .pspec
476 0 : .as_ref()
477 0 : .expect("spec must be set")
478 0 : .clone();
479 0 : let sk_connstrs: Vec<String> = pspec.safekeeper_connstrings.clone();
480 0 : let sk_configs = sk_connstrs.into_iter().map(|connstr| {
481 0 : // Format connstr
482 0 : let id = connstr.clone();
483 0 : let connstr = format!("postgresql://no_user@{}", connstr);
484 0 : let options = format!(
485 0 : "-c timeline_id={} tenant_id={}",
486 0 : pspec.timeline_id, pspec.tenant_id
487 0 : );
488 0 :
489 0 : // Construct client
490 0 : let mut config = tokio_postgres::Config::from_str(&connstr).unwrap();
491 0 : config.options(&options);
492 0 : if let Some(storage_auth_token) = pspec.storage_auth_token.clone() {
493 0 : config.password(storage_auth_token);
494 0 : }
495 :
496 0 : (id, config)
497 0 : });
498 0 :
499 0 : // Create task set to query all safekeepers
500 0 : let mut tasks = FuturesUnordered::new();
501 0 : let quorum = sk_configs.len() / 2 + 1;
502 0 : for (id, config) in sk_configs {
503 0 : let timeout = tokio::time::Duration::from_millis(100);
504 0 : let task = tokio::time::timeout(timeout, ping_safekeeper(id, config));
505 0 : tasks.push(tokio::spawn(task));
506 0 : }
507 :
508 : // Get a quorum of responses or errors
509 0 : let mut responses = Vec::new();
510 0 : let mut join_errors = Vec::new();
511 0 : let mut task_errors = Vec::new();
512 0 : let mut timeout_errors = Vec::new();
513 0 : while let Some(response) = tasks.next().await {
514 0 : match response {
515 0 : Ok(Ok(Ok(r))) => responses.push(r),
516 0 : Ok(Ok(Err(e))) => task_errors.push(e),
517 0 : Ok(Err(e)) => timeout_errors.push(e),
518 0 : Err(e) => join_errors.push(e),
519 : };
520 0 : if responses.len() >= quorum {
521 0 : break;
522 0 : }
523 0 : if join_errors.len() + task_errors.len() + timeout_errors.len() >= quorum {
524 0 : break;
525 0 : }
526 : }
527 :
528 : // In case of error, log and fail the check, but don't crash.
529 : // We're playing it safe because these errors could be transient
530 : // and we don't yet retry. Also being careful here allows us to
531 : // be backwards compatible with safekeepers that don't have the
532 : // TIMELINE_STATUS API yet.
533 0 : if responses.len() < quorum {
534 0 : error!(
535 0 : "failed sync safekeepers check {:?} {:?} {:?}",
536 : join_errors, task_errors, timeout_errors
537 : );
538 0 : return Ok(None);
539 0 : }
540 0 :
541 0 : Ok(check_if_synced(responses))
542 0 : }
543 :
544 : // Fast path for sync_safekeepers. If they're already synced we get the lsn
545 : // in one roundtrip. If not, we should do a full sync_safekeepers.
546 0 : pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
547 0 : let start_time = Utc::now();
548 0 :
549 0 : // Run actual work with new tokio runtime
550 0 : let rt = tokio::runtime::Builder::new_current_thread()
551 0 : .enable_all()
552 0 : .build()
553 0 : .expect("failed to create rt");
554 0 : let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
555 0 :
556 0 : // Record runtime
557 0 : self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
558 0 : .signed_duration_since(start_time)
559 0 : .to_std()
560 0 : .unwrap()
561 0 : .as_millis() as u64;
562 0 : result
563 0 : }
564 :
565 : // Run `postgres` in a special mode with `--sync-safekeepers` argument
566 : // and return the reported LSN back to the caller.
567 : #[instrument(skip_all)]
568 : pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
569 : let start_time = Utc::now();
570 :
571 : let mut sync_handle = maybe_cgexec(&self.pgbin)
572 : .args(["--sync-safekeepers"])
573 : .env("PGDATA", &self.pgdata) // we cannot use -D in this mode
574 : .envs(if let Some(storage_auth_token) = &storage_auth_token {
575 : vec![("NEON_AUTH_TOKEN", storage_auth_token)]
576 : } else {
577 : vec![]
578 : })
579 : .stdout(Stdio::piped())
580 : .stderr(Stdio::piped())
581 : .spawn()
582 : .expect("postgres --sync-safekeepers failed to start");
583 : SYNC_SAFEKEEPERS_PID.store(sync_handle.id(), Ordering::SeqCst);
584 :
585 : // `postgres --sync-safekeepers` will print all log output to stderr and
586 : // final LSN to stdout. So we leave stdout to collect LSN, while stderr logs
587 : // will be collected in a child thread.
588 : let stderr = sync_handle
589 : .stderr
590 : .take()
591 : .expect("stderr should be captured");
592 : let logs_handle = handle_postgres_logs(stderr);
593 :
594 : let sync_output = sync_handle
595 : .wait_with_output()
596 : .expect("postgres --sync-safekeepers failed");
597 : SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst);
598 :
599 : // Process has exited, so we can join the logs thread.
600 : let _ = logs_handle
601 : .join()
602 0 : .map_err(|e| tracing::error!("log thread panicked: {:?}", e));
603 :
604 : if !sync_output.status.success() {
605 : anyhow::bail!(
606 : "postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
607 : sync_output.status,
608 : String::from_utf8(sync_output.stdout)
609 : .expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
610 : );
611 : }
612 :
613 : self.state.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now()
614 : .signed_duration_since(start_time)
615 : .to_std()
616 : .unwrap()
617 : .as_millis() as u64;
618 :
619 : let lsn = Lsn::from_str(String::from_utf8(sync_output.stdout)?.trim())?;
620 :
621 : Ok(lsn)
622 : }
623 :
624 : /// Do all the preparations like PGDATA directory creation, configuration,
625 : /// safekeepers sync, basebackup, etc.
626 : #[instrument(skip_all)]
627 : pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
628 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
629 : let spec = &pspec.spec;
630 : let pgdata_path = Path::new(&self.pgdata);
631 :
632 : // Remove/create an empty pgdata directory and put configuration there.
633 : self.create_pgdata()?;
634 : config::write_postgres_conf(
635 : &pgdata_path.join("postgresql.conf"),
636 : &pspec.spec,
637 : self.http_port,
638 : )?;
639 :
640 : // Syncing safekeepers is only safe with primary nodes: if a primary
641 : // is already connected it will be kicked out, so a secondary (standby)
642 : // cannot sync safekeepers.
643 : let lsn = match spec.mode {
644 : ComputeMode::Primary => {
645 : info!("checking if safekeepers are synced");
646 : let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
647 : lsn
648 : } else {
649 : info!("starting safekeepers syncing");
650 : self.sync_safekeepers(pspec.storage_auth_token.clone())
651 0 : .with_context(|| "failed to sync safekeepers")?
652 : };
653 : info!("safekeepers synced at LSN {}", lsn);
654 : lsn
655 : }
656 : ComputeMode::Static(lsn) => {
657 : info!("Starting read-only node at static LSN {}", lsn);
658 : lsn
659 : }
660 : ComputeMode::Replica => {
661 : info!("Initializing standby from latest Pageserver LSN");
662 : Lsn(0)
663 : }
664 : };
665 :
666 : info!(
667 : "getting basebackup@{} from pageserver {}",
668 : lsn, &pspec.pageserver_connstr
669 : );
670 0 : self.get_basebackup(compute_state, lsn).with_context(|| {
671 0 : format!(
672 0 : "failed to get basebackup@{} from pageserver {}",
673 0 : lsn, &pspec.pageserver_connstr
674 0 : )
675 0 : })?;
676 :
677 : // Update pg_hba.conf received with basebackup.
678 : update_pg_hba(pgdata_path)?;
679 :
680 : // Place pg_dynshmem under /dev/shm. This allows us to use
681 : // 'dynamic_shared_memory_type = mmap' so that the files are placed in
682 : // /dev/shm, similar to how 'dynamic_shared_memory_type = posix' works.
683 : //
684 : // Why on earth don't we just stick to the 'posix' default, you might
685 : // ask. It turns out that making large allocations with 'posix' doesn't
686 : // work very well with autoscaling. The behavior we want is that:
687 : //
688 : // 1. You can make large DSM allocations, larger than the current RAM
689 : // size of the VM, without errors
690 : //
691 : // 2. If the allocated memory is really used, the VM is scaled up
692 : // automatically to accommodate that
693 : //
694 : // We try to make that possible by having swap in the VM. But with the
695 : // default 'posix' DSM implementation, we fail step 1, even when there's
696 : // plenty of swap available. PostgreSQL uses posix_fallocate() to create
697 : // the shmem segment, which is really just a file in /dev/shm in Linux,
698 : // but posix_fallocate() on tmpfs returns ENOMEM if the size is larger
699 : // than available RAM.
700 : //
701 : // Using 'dynamic_shared_memory_type = mmap' works around that, because
702 : // the Postgres 'mmap' DSM implementation doesn't use
703 : // posix_fallocate(). Instead, it uses repeated calls to write(2) to
704 : // fill the file with zeros. It's weird that that differs between
705 : // 'posix' and 'mmap', but we take advantage of it. When the file is
706 : // filled slowly with write(2), the kernel allows it to grow larger, as
707 : // long as there's swap available.
708 : //
709 : // In short, using 'dynamic_shared_memory_type = mmap' allows us one DSM
710 : // segment to be larger than currently available RAM. But because we
711 : // don't want to store it on a real file, which the kernel would try to
712 : // flush to disk, so symlink pg_dynshm to /dev/shm.
713 : //
714 : // We don't set 'dynamic_shared_memory_type = mmap' here, we let the
715 : // control plane control that option. If 'mmap' is not used, this
716 : // symlink doesn't affect anything.
717 : //
718 : // See https://github.com/neondatabase/autoscaling/issues/800
719 : std::fs::remove_dir(pgdata_path.join("pg_dynshmem"))?;
720 : symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?;
721 :
722 : match spec.mode {
723 : ComputeMode::Primary => {}
724 : ComputeMode::Replica | ComputeMode::Static(..) => {
725 : add_standby_signal(pgdata_path)?;
726 : }
727 : }
728 :
729 : Ok(())
730 : }
731 :
732 : /// Start and stop a postgres process to warm up the VM for startup.
733 0 : pub fn prewarm_postgres(&self) -> Result<()> {
734 0 : info!("prewarming");
735 :
736 : // Create pgdata
737 0 : let pgdata = &format!("{}.warmup", self.pgdata);
738 0 : create_pgdata(pgdata)?;
739 :
740 : // Run initdb to completion
741 0 : info!("running initdb");
742 0 : let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
743 0 : Command::new(initdb_bin)
744 0 : .args(["--pgdata", pgdata])
745 0 : .output()
746 0 : .expect("cannot start initdb process");
747 :
748 : // Write conf
749 : use std::io::Write;
750 0 : let conf_path = Path::new(pgdata).join("postgresql.conf");
751 0 : let mut file = std::fs::File::create(conf_path)?;
752 0 : writeln!(file, "shared_buffers=65536")?;
753 0 : writeln!(file, "port=51055")?; // Nobody should be connecting
754 0 : writeln!(file, "shared_preload_libraries = 'neon'")?;
755 :
756 : // Start postgres
757 0 : info!("starting postgres");
758 0 : let mut pg = maybe_cgexec(&self.pgbin)
759 0 : .args(["-D", pgdata])
760 0 : .spawn()
761 0 : .expect("cannot start postgres process");
762 0 :
763 0 : // Stop it when it's ready
764 0 : info!("waiting for postgres");
765 0 : wait_for_postgres(&mut pg, Path::new(pgdata))?;
766 : // SIGQUIT orders postgres to exit immediately. We don't want to SIGKILL
767 : // it to avoid orphaned processes prowling around while datadir is
768 : // wiped.
769 0 : let pm_pid = Pid::from_raw(pg.id() as i32);
770 0 : kill(pm_pid, Signal::SIGQUIT)?;
771 0 : info!("sent SIGQUIT signal");
772 0 : pg.wait()?;
773 0 : info!("done prewarming");
774 :
775 : // clean up
776 0 : let _ok = fs::remove_dir_all(pgdata);
777 0 : Ok(())
778 0 : }
779 :
780 : /// Start Postgres as a child process and manage DBs/roles.
781 : /// After that this will hang waiting on the postmaster process to exit.
782 : /// Returns a handle to the child process and a handle to the logs thread.
783 : #[instrument(skip_all)]
784 : pub fn start_postgres(
785 : &self,
786 : storage_auth_token: Option<String>,
787 : ) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
788 : let pgdata_path = Path::new(&self.pgdata);
789 :
790 : // Run postgres as a child process.
791 : let mut pg = maybe_cgexec(&self.pgbin)
792 : .args(["-D", &self.pgdata])
793 : .envs(if let Some(storage_auth_token) = &storage_auth_token {
794 : vec![("NEON_AUTH_TOKEN", storage_auth_token)]
795 : } else {
796 : vec![]
797 : })
798 : .stderr(Stdio::piped())
799 : .spawn()
800 : .expect("cannot start postgres process");
801 : PG_PID.store(pg.id(), Ordering::SeqCst);
802 :
803 : // Start a thread to collect logs from stderr.
804 : let stderr = pg.stderr.take().expect("stderr should be captured");
805 : let logs_handle = handle_postgres_logs(stderr);
806 :
807 : wait_for_postgres(&mut pg, pgdata_path)?;
808 :
809 : Ok((pg, logs_handle))
810 : }
811 :
812 : /// Do post configuration of the already started Postgres. This function spawns a background thread to
813 : /// configure the database after applying the compute spec. Currently, it upgrades the neon extension
814 : /// version. In the future, it may upgrade all 3rd-party extensions.
815 : #[instrument(skip_all)]
816 : pub fn post_apply_config(&self) -> Result<()> {
817 : let conf = self.get_conn_conf(Some("compute_ctl:post_apply_config"));
818 0 : thread::spawn(move || {
819 0 : let func = || {
820 0 : let mut client = conf.connect(NoTls)?;
821 0 : handle_neon_extension_upgrade(&mut client)
822 0 : .context("handle_neon_extension_upgrade")?;
823 0 : Ok::<_, anyhow::Error>(())
824 0 : };
825 0 : if let Err(err) = func() {
826 0 : error!("error while post_apply_config: {err:#}");
827 0 : }
828 0 : });
829 : Ok(())
830 : }
831 :
832 0 : pub fn get_conn_conf(&self, application_name: Option<&str>) -> postgres::Config {
833 0 : let mut conf = self.conn_conf.clone();
834 0 : if let Some(application_name) = application_name {
835 0 : conf.application_name(application_name);
836 0 : }
837 0 : conf
838 0 : }
839 :
840 0 : pub fn get_tokio_conn_conf(&self, application_name: Option<&str>) -> tokio_postgres::Config {
841 0 : let mut conf = self.tokio_conn_conf.clone();
842 0 : if let Some(application_name) = application_name {
843 0 : conf.application_name(application_name);
844 0 : }
845 0 : conf
846 0 : }
847 :
848 0 : pub async fn get_maintenance_client(
849 0 : conf: &tokio_postgres::Config,
850 0 : ) -> Result<tokio_postgres::Client> {
851 0 : let mut conf = conf.clone();
852 0 : conf.application_name("compute_ctl:apply_config");
853 :
854 0 : let (client, conn) = match conf.connect(NoTls).await {
855 : // If connection fails, it may be the old node with `zenith_admin` superuser.
856 : //
857 : // In this case we need to connect with old `zenith_admin` name
858 : // and create new user. We cannot simply rename connected user,
859 : // but we can create a new one and grant it all privileges.
860 0 : Err(e) => match e.code() {
861 : Some(&SqlState::INVALID_PASSWORD)
862 : | Some(&SqlState::INVALID_AUTHORIZATION_SPECIFICATION) => {
863 : // Connect with zenith_admin if cloud_admin could not authenticate
864 0 : info!(
865 0 : "cannot connect to postgres: {}, retrying with `zenith_admin` username",
866 : e
867 : );
868 0 : let mut zenith_admin_conf = postgres::config::Config::from(conf.clone());
869 0 : zenith_admin_conf.application_name("compute_ctl:apply_config");
870 0 : zenith_admin_conf.user("zenith_admin");
871 :
872 0 : let mut client =
873 0 : zenith_admin_conf.connect(NoTls)
874 0 : .context("broken cloud_admin credential: tried connecting with cloud_admin but could not authenticate, and zenith_admin does not work either")?;
875 :
876 : // Disable forwarding so that users don't get a cloud_admin role
877 0 : let mut func = || {
878 0 : client.simple_query("SET neon.forward_ddl = false")?;
879 0 : client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
880 0 : client.simple_query("GRANT zenith_admin TO cloud_admin")?;
881 0 : Ok::<_, anyhow::Error>(())
882 0 : };
883 0 : func().context("apply_config setup cloud_admin")?;
884 :
885 0 : drop(client);
886 0 :
887 0 : // Reconnect with connstring with expected name
888 0 : conf.connect(NoTls).await?
889 : }
890 0 : _ => return Err(e.into()),
891 : },
892 0 : Ok((client, conn)) => (client, conn),
893 : };
894 :
895 0 : spawn(async move {
896 0 : if let Err(e) = conn.await {
897 0 : error!("maintenance client connection error: {}", e);
898 0 : }
899 0 : });
900 0 :
901 0 : // Disable DDL forwarding because control plane already knows about the roles/databases
902 0 : // we're about to modify.
903 0 : client
904 0 : .simple_query("SET neon.forward_ddl = false")
905 0 : .await
906 0 : .context("apply_config SET neon.forward_ddl = false")?;
907 :
908 0 : Ok(client)
909 0 : }
910 :
911 : /// Apply the spec to the running PostgreSQL instance.
912 : /// The caller can decide to run with multiple clients in parallel, or
913 : /// single mode. Either way, the commands executed will be the same, and
914 : /// only commands run in different databases are parallelized.
915 : #[instrument(skip_all)]
916 : pub fn apply_spec_sql(
917 : &self,
918 : spec: Arc<ComputeSpec>,
919 : conf: Arc<tokio_postgres::Config>,
920 : concurrency: usize,
921 : ) -> Result<()> {
922 : let rt = tokio::runtime::Builder::new_multi_thread()
923 : .enable_all()
924 : .build()?;
925 :
926 : info!("Applying config with max {} concurrency", concurrency);
927 : debug!("Config: {:?}", spec);
928 :
929 0 : rt.block_on(async {
930 : // Proceed with post-startup configuration. Note, that order of operations is important.
931 0 : let client = Self::get_maintenance_client(&conf).await?;
932 0 : let spec = spec.clone();
933 :
934 0 : let databases = get_existing_dbs_async(&client).await?;
935 0 : let roles = get_existing_roles_async(&client)
936 0 : .await?
937 0 : .into_iter()
938 0 : .map(|role| (role.name.clone(), role))
939 0 : .collect::<HashMap<String, Role>>();
940 0 :
941 0 : // Check if we need to drop subscriptions before starting the endpoint.
942 0 : //
943 0 : // It is important to do this operation exactly once when endpoint starts on a new branch.
944 0 : // Otherwise, we may drop not inherited, but newly created subscriptions.
945 0 : //
946 0 : // We cannot rely only on spec.drop_subscriptions_before_start flag,
947 0 : // because if for some reason compute restarts inside VM,
948 0 : // it will start again with the same spec and flag value.
949 0 : //
950 0 : // To handle this, we save the fact of the operation in the database
951 0 : // in the neon.drop_subscriptions_done table.
952 0 : // If the table does not exist, we assume that the operation was never performed, so we must do it.
953 0 : // If table exists, we check if the operation was performed on the current timelilne.
954 0 : //
955 0 : let mut drop_subscriptions_done = false;
956 0 :
957 0 : if spec.drop_subscriptions_before_start {
958 0 : let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
959 0 : let query = format!("select 1 from neon.drop_subscriptions_done where timeline_id = '{}'", timeline_id);
960 0 :
961 0 : info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
962 :
963 : drop_subscriptions_done = match
964 0 : client.simple_query(&query).await {
965 0 : Ok(result) => {
966 0 : matches!(&result[0], postgres::SimpleQueryMessage::Row(_))
967 : },
968 0 : Err(e) =>
969 0 : {
970 0 : match e.code() {
971 0 : Some(&SqlState::UNDEFINED_TABLE) => false,
972 : _ => {
973 : // We don't expect any other error here, except for the schema/table not existing
974 0 : error!("Error checking if drop subscription operation was already performed: {}", e);
975 0 : return Err(e.into());
976 : }
977 : }
978 : }
979 : }
980 0 : };
981 :
982 :
983 0 : let jwks_roles = Arc::new(
984 0 : spec.as_ref()
985 0 : .local_proxy_config
986 0 : .iter()
987 0 : .flat_map(|it| &it.jwks)
988 0 : .flatten()
989 0 : .flat_map(|setting| &setting.role_names)
990 0 : .cloned()
991 0 : .collect::<HashSet<_>>(),
992 0 : );
993 0 :
994 0 : let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
995 0 : roles,
996 0 : dbs: databases,
997 0 : }));
998 0 :
999 0 : // Apply special pre drop database phase.
1000 0 : // NOTE: we use the code of RunInEachDatabase phase for parallelism
1001 0 : // and connection management, but we don't really run it in *each* database,
1002 0 : // only in databases, we're about to drop.
1003 0 : info!("Applying PerDatabase (pre-dropdb) phase");
1004 0 : let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
1005 0 :
1006 0 : // Run the phase for each database that we're about to drop.
1007 0 : let db_processes = spec
1008 0 : .delta_operations
1009 0 : .iter()
1010 0 : .flatten()
1011 0 : .filter_map(move |op| {
1012 0 : if op.action.as_str() == "delete_db" {
1013 0 : Some(op.name.clone())
1014 : } else {
1015 0 : None
1016 : }
1017 0 : })
1018 0 : .map(|dbname| {
1019 0 : let spec = spec.clone();
1020 0 : let ctx = ctx.clone();
1021 0 : let jwks_roles = jwks_roles.clone();
1022 0 : let mut conf = conf.as_ref().clone();
1023 0 : let concurrency_token = concurrency_token.clone();
1024 0 : // We only need dbname field for this phase, so set other fields to dummy values
1025 0 : let db = DB::UserDB(Database {
1026 0 : name: dbname.clone(),
1027 0 : owner: "cloud_admin".to_string(),
1028 0 : options: None,
1029 0 : restrict_conn: false,
1030 0 : invalid: false,
1031 0 : });
1032 0 :
1033 0 : debug!("Applying per-database phases for Database {:?}", &db);
1034 :
1035 0 : match &db {
1036 0 : DB::SystemDB => {}
1037 0 : DB::UserDB(db) => {
1038 0 : conf.dbname(db.name.as_str());
1039 0 : }
1040 : }
1041 :
1042 0 : let conf = Arc::new(conf);
1043 0 : let fut = Self::apply_spec_sql_db(
1044 0 : spec.clone(),
1045 0 : conf,
1046 0 : ctx.clone(),
1047 0 : jwks_roles.clone(),
1048 0 : concurrency_token.clone(),
1049 0 : db,
1050 0 : [DropLogicalSubscriptions].to_vec(),
1051 0 : );
1052 0 :
1053 0 : Ok(spawn(fut))
1054 0 : })
1055 0 : .collect::<Vec<Result<_, anyhow::Error>>>();
1056 :
1057 0 : for process in db_processes.into_iter() {
1058 0 : let handle = process?;
1059 0 : if let Err(e) = handle.await? {
1060 : // Handle the error case where the database does not exist
1061 : // We do not check whether the DB exists or not in the deletion phase,
1062 : // so we shouldn't be strict about it in pre-deletion cleanup as well.
1063 0 : if e.to_string().contains("does not exist") {
1064 0 : warn!("Error dropping subscription: {}", e);
1065 : } else {
1066 0 : return Err(e);
1067 : }
1068 0 : };
1069 : }
1070 :
1071 0 : for phase in [
1072 0 : CreateSuperUser,
1073 0 : DropInvalidDatabases,
1074 0 : RenameRoles,
1075 0 : CreateAndAlterRoles,
1076 0 : RenameAndDeleteDatabases,
1077 0 : CreateAndAlterDatabases,
1078 0 : CreateSchemaNeon,
1079 : ] {
1080 0 : info!("Applying phase {:?}", &phase);
1081 0 : apply_operations(
1082 0 : spec.clone(),
1083 0 : ctx.clone(),
1084 0 : jwks_roles.clone(),
1085 0 : phase,
1086 0 : || async { Ok(&client) },
1087 0 : )
1088 0 : .await?;
1089 : }
1090 :
1091 0 : info!("Applying RunInEachDatabase2 phase");
1092 0 : let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
1093 0 :
1094 0 : let db_processes = spec
1095 0 : .cluster
1096 0 : .databases
1097 0 : .iter()
1098 0 : .map(|db| DB::new(db.clone()))
1099 0 : // include
1100 0 : .chain(once(DB::SystemDB))
1101 0 : .map(|db| {
1102 0 : let spec = spec.clone();
1103 0 : let ctx = ctx.clone();
1104 0 : let jwks_roles = jwks_roles.clone();
1105 0 : let mut conf = conf.as_ref().clone();
1106 0 : let concurrency_token = concurrency_token.clone();
1107 0 : let db = db.clone();
1108 0 :
1109 0 : debug!("Applying per-database phases for Database {:?}", &db);
1110 :
1111 0 : match &db {
1112 0 : DB::SystemDB => {}
1113 0 : DB::UserDB(db) => {
1114 0 : conf.dbname(db.name.as_str());
1115 0 : }
1116 : }
1117 :
1118 0 : let conf = Arc::new(conf);
1119 0 : let mut phases = vec![
1120 0 : DeleteDBRoleReferences,
1121 0 : ChangeSchemaPerms,
1122 0 : HandleAnonExtension,
1123 0 : ];
1124 0 :
1125 0 : if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
1126 0 : info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
1127 0 : phases.push(DropLogicalSubscriptions);
1128 0 : }
1129 :
1130 0 : let fut = Self::apply_spec_sql_db(
1131 0 : spec.clone(),
1132 0 : conf,
1133 0 : ctx.clone(),
1134 0 : jwks_roles.clone(),
1135 0 : concurrency_token.clone(),
1136 0 : db,
1137 0 : phases,
1138 0 : );
1139 0 :
1140 0 : Ok(spawn(fut))
1141 0 : })
1142 0 : .collect::<Vec<Result<_, anyhow::Error>>>();
1143 :
1144 0 : for process in db_processes.into_iter() {
1145 0 : let handle = process?;
1146 0 : handle.await??;
1147 : }
1148 :
1149 0 : let mut phases = vec![
1150 0 : HandleOtherExtensions,
1151 0 : HandleNeonExtension, // This step depends on CreateSchemaNeon
1152 0 : CreateAvailabilityCheck,
1153 0 : DropRoles,
1154 0 : ];
1155 0 :
1156 0 : // This step depends on CreateSchemaNeon
1157 0 : if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
1158 0 : info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
1159 0 : phases.push(FinalizeDropLogicalSubscriptions);
1160 0 : }
1161 :
1162 0 : for phase in phases {
1163 0 : debug!("Applying phase {:?}", &phase);
1164 0 : apply_operations(
1165 0 : spec.clone(),
1166 0 : ctx.clone(),
1167 0 : jwks_roles.clone(),
1168 0 : phase,
1169 0 : || async { Ok(&client) },
1170 0 : )
1171 0 : .await?;
1172 : }
1173 :
1174 0 : Ok::<(), anyhow::Error>(())
1175 0 : })?;
1176 :
1177 : Ok(())
1178 : }
1179 :
1180 : /// Apply SQL migrations of the RunInEachDatabase phase.
1181 : ///
1182 : /// May opt to not connect to databases that don't have any scheduled
1183 : /// operations. The function is concurrency-controlled with the provided
1184 : /// semaphore. The caller has to make sure the semaphore isn't exhausted.
1185 0 : async fn apply_spec_sql_db(
1186 0 : spec: Arc<ComputeSpec>,
1187 0 : conf: Arc<tokio_postgres::Config>,
1188 0 : ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
1189 0 : jwks_roles: Arc<HashSet<String>>,
1190 0 : concurrency_token: Arc<tokio::sync::Semaphore>,
1191 0 : db: DB,
1192 0 : subphases: Vec<PerDatabasePhase>,
1193 0 : ) -> Result<()> {
1194 0 : let _permit = concurrency_token.acquire().await?;
1195 :
1196 0 : let mut client_conn = None;
1197 :
1198 0 : for subphase in subphases {
1199 0 : apply_operations(
1200 0 : spec.clone(),
1201 0 : ctx.clone(),
1202 0 : jwks_roles.clone(),
1203 0 : RunInEachDatabase {
1204 0 : db: db.clone(),
1205 0 : subphase,
1206 0 : },
1207 0 : // Only connect if apply_operation actually wants a connection.
1208 0 : // It's quite possible this database doesn't need any queries,
1209 0 : // so by not connecting we save time and effort connecting to
1210 0 : // that database.
1211 0 : || async {
1212 0 : if client_conn.is_none() {
1213 0 : let db_client = Self::get_maintenance_client(&conf).await?;
1214 0 : client_conn.replace(db_client);
1215 0 : }
1216 0 : let client = client_conn.as_ref().unwrap();
1217 0 : Ok(client)
1218 0 : },
1219 0 : )
1220 0 : .await?;
1221 : }
1222 :
1223 0 : drop(client_conn);
1224 0 :
1225 0 : Ok::<(), anyhow::Error>(())
1226 0 : }
1227 :
1228 : /// Choose how many concurrent connections to use for applying the spec changes.
1229 0 : pub fn max_service_connections(
1230 0 : &self,
1231 0 : compute_state: &ComputeState,
1232 0 : spec: &ComputeSpec,
1233 0 : ) -> usize {
1234 0 : // If the cluster is in Init state we don't have to deal with user connections,
1235 0 : // and can thus use all `max_connections` connection slots. However, that's generally not
1236 0 : // very efficient, so we generally still limit it to a smaller number.
1237 0 : if compute_state.status == ComputeStatus::Init {
1238 : // If the settings contain 'max_connections', use that as template
1239 0 : if let Some(config) = spec.cluster.settings.find("max_connections") {
1240 0 : config.parse::<usize>().ok()
1241 : } else {
1242 : // Otherwise, try to find the setting in the postgresql_conf string
1243 0 : spec.cluster
1244 0 : .postgresql_conf
1245 0 : .iter()
1246 0 : .flat_map(|conf| conf.split("\n"))
1247 0 : .filter_map(|line| {
1248 0 : if !line.contains("max_connections") {
1249 0 : return None;
1250 0 : }
1251 :
1252 0 : let (key, value) = line.split_once("=")?;
1253 0 : let key = key
1254 0 : .trim_start_matches(char::is_whitespace)
1255 0 : .trim_end_matches(char::is_whitespace);
1256 0 :
1257 0 : let value = value
1258 0 : .trim_start_matches(char::is_whitespace)
1259 0 : .trim_end_matches(char::is_whitespace);
1260 0 :
1261 0 : if key != "max_connections" {
1262 0 : return None;
1263 0 : }
1264 0 :
1265 0 : value.parse::<usize>().ok()
1266 0 : })
1267 0 : .next()
1268 : }
1269 : // If max_connections is present, use at most 1/3rd of that.
1270 : // When max_connections is lower than 30, try to use at least 10 connections, but
1271 : // never more than max_connections.
1272 0 : .map(|limit| match limit {
1273 0 : 0..10 => limit,
1274 0 : 10..30 => 10,
1275 0 : 30.. => limit / 3,
1276 0 : })
1277 0 : // If we didn't find max_connections, default to 10 concurrent connections.
1278 0 : .unwrap_or(10)
1279 : } else {
1280 : // state == Running
1281 : // Because the cluster is already in the Running state, we should assume users are
1282 : // already connected to the cluster, and high concurrency could negatively
1283 : // impact user connectivity. Therefore, we can limit concurrency to the number of
1284 : // reserved superuser connections, which users wouldn't be able to use anyway.
1285 0 : spec.cluster
1286 0 : .settings
1287 0 : .find("superuser_reserved_connections")
1288 0 : .iter()
1289 0 : .filter_map(|val| val.parse::<usize>().ok())
1290 0 : .map(|val| if val > 1 { val - 1 } else { 1 })
1291 0 : .last()
1292 0 : .unwrap_or(3)
1293 : }
1294 0 : }
1295 :
1296 : /// Do initial configuration of the already started Postgres.
1297 : #[instrument(skip_all)]
1298 : pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
1299 : let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
1300 :
1301 : let conf = Arc::new(conf);
1302 : let spec = Arc::new(
1303 : compute_state
1304 : .pspec
1305 : .as_ref()
1306 : .expect("spec must be set")
1307 : .spec
1308 : .clone(),
1309 : );
1310 :
1311 : let max_concurrent_connections = self.max_service_connections(compute_state, &spec);
1312 :
1313 : // Merge-apply spec & changes to PostgreSQL state.
1314 : self.apply_spec_sql(spec.clone(), conf.clone(), max_concurrent_connections)?;
1315 :
1316 : if let Some(ref local_proxy) = &spec.clone().local_proxy_config {
1317 : info!("configuring local_proxy");
1318 : local_proxy::configure(local_proxy).context("apply_config local_proxy")?;
1319 : }
1320 :
1321 : // Run migrations separately to not hold up cold starts
1322 0 : thread::spawn(move || {
1323 0 : let conf = conf.as_ref().clone();
1324 0 : let mut conf = postgres::config::Config::from(conf);
1325 0 : conf.application_name("compute_ctl:migrations");
1326 0 :
1327 0 : match conf.connect(NoTls) {
1328 0 : Ok(mut client) => {
1329 0 : if let Err(e) = handle_migrations(&mut client) {
1330 0 : error!("Failed to run migrations: {}", e);
1331 0 : }
1332 : }
1333 0 : Err(e) => {
1334 0 : error!(
1335 0 : "Failed to connect to the compute for running migrations: {}",
1336 : e
1337 : );
1338 : }
1339 : };
1340 0 : });
1341 :
1342 : Ok::<(), anyhow::Error>(())
1343 : }
1344 :
1345 : // Wrapped this around `pg_ctl reload`, but right now we don't use
1346 : // `pg_ctl` for start / stop.
1347 : #[instrument(skip_all)]
1348 : fn pg_reload_conf(&self) -> Result<()> {
1349 : let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
1350 : Command::new(pgctl_bin)
1351 : .args(["reload", "-D", &self.pgdata])
1352 : .output()
1353 : .expect("cannot run pg_ctl process");
1354 : Ok(())
1355 : }
1356 :
1357 : /// Similar to `apply_config()`, but does a bit different sequence of operations,
1358 : /// as it's used to reconfigure a previously started and configured Postgres node.
1359 : #[instrument(skip_all)]
1360 : pub fn reconfigure(&self) -> Result<()> {
1361 : let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
1362 :
1363 : if let Some(ref pgbouncer_settings) = spec.pgbouncer_settings {
1364 : info!("tuning pgbouncer");
1365 :
1366 : let rt = tokio::runtime::Builder::new_current_thread()
1367 : .enable_all()
1368 : .build()
1369 : .expect("failed to create rt");
1370 :
1371 : // Spawn a thread to do the tuning,
1372 : // so that we don't block the main thread that starts Postgres.
1373 : let pgbouncer_settings = pgbouncer_settings.clone();
1374 0 : let _handle = thread::spawn(move || {
1375 0 : let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
1376 0 : if let Err(err) = res {
1377 0 : error!("error while tuning pgbouncer: {err:?}");
1378 0 : }
1379 0 : });
1380 : }
1381 :
1382 : if let Some(ref local_proxy) = spec.local_proxy_config {
1383 : info!("configuring local_proxy");
1384 :
1385 : // Spawn a thread to do the configuration,
1386 : // so that we don't block the main thread that starts Postgres.
1387 : let local_proxy = local_proxy.clone();
1388 0 : let _handle = Some(thread::spawn(move || {
1389 0 : if let Err(err) = local_proxy::configure(&local_proxy) {
1390 0 : error!("error while configuring local_proxy: {err:?}");
1391 0 : }
1392 0 : }));
1393 : }
1394 :
1395 : // Write new config
1396 : let pgdata_path = Path::new(&self.pgdata);
1397 : let postgresql_conf_path = pgdata_path.join("postgresql.conf");
1398 : config::write_postgres_conf(&postgresql_conf_path, &spec, self.http_port)?;
1399 :
1400 : let max_concurrent_connections = spec.reconfigure_concurrency;
1401 :
1402 : // Temporarily reset max_cluster_size in config
1403 : // to avoid the possibility of hitting the limit, while we are reconfiguring:
1404 : // creating new extensions, roles, etc.
1405 0 : config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
1406 0 : self.pg_reload_conf()?;
1407 :
1408 0 : if spec.mode == ComputeMode::Primary {
1409 0 : let mut conf = tokio_postgres::Config::from_str(self.connstr.as_str()).unwrap();
1410 0 : conf.application_name("apply_config");
1411 0 : let conf = Arc::new(conf);
1412 0 :
1413 0 : let spec = Arc::new(spec.clone());
1414 0 :
1415 0 : self.apply_spec_sql(spec, conf, max_concurrent_connections)?;
1416 0 : }
1417 :
1418 0 : Ok(())
1419 0 : })?;
1420 :
1421 : self.pg_reload_conf()?;
1422 :
1423 : let unknown_op = "unknown".to_string();
1424 : let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
1425 : info!(
1426 : "finished reconfiguration of compute node for operation {}",
1427 : op_id
1428 : );
1429 :
1430 : Ok(())
1431 : }
1432 :
1433 : #[instrument(skip_all)]
1434 : pub fn start_compute(&self) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
1435 : let compute_state = self.state.lock().unwrap().clone();
1436 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
1437 : info!(
1438 : "starting compute for project {}, operation {}, tenant {}, timeline {}",
1439 : pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
1440 : pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
1441 : pspec.tenant_id,
1442 : pspec.timeline_id,
1443 : );
1444 :
1445 : // tune pgbouncer
1446 : if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
1447 : info!("tuning pgbouncer");
1448 :
1449 : let rt = tokio::runtime::Builder::new_current_thread()
1450 : .enable_all()
1451 : .build()
1452 : .expect("failed to create rt");
1453 :
1454 : // Spawn a thread to do the tuning,
1455 : // so that we don't block the main thread that starts Postgres.
1456 : let pgbouncer_settings = pgbouncer_settings.clone();
1457 0 : let _handle = thread::spawn(move || {
1458 0 : let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
1459 0 : if let Err(err) = res {
1460 0 : error!("error while tuning pgbouncer: {err:?}");
1461 0 : }
1462 0 : });
1463 : }
1464 :
1465 : if let Some(local_proxy) = &pspec.spec.local_proxy_config {
1466 : info!("configuring local_proxy");
1467 :
1468 : // Spawn a thread to do the configuration,
1469 : // so that we don't block the main thread that starts Postgres.
1470 : let local_proxy = local_proxy.clone();
1471 0 : let _handle = thread::spawn(move || {
1472 0 : if let Err(err) = local_proxy::configure(&local_proxy) {
1473 0 : error!("error while configuring local_proxy: {err:?}");
1474 0 : }
1475 0 : });
1476 : }
1477 :
1478 : info!(
1479 : "start_compute spec.remote_extensions {:?}",
1480 : pspec.spec.remote_extensions
1481 : );
1482 :
1483 : // This part is sync, because we need to download
1484 : // remote shared_preload_libraries before postgres start (if any)
1485 : if let Some(remote_extensions) = &pspec.spec.remote_extensions {
1486 : // First, create control files for all availale extensions
1487 : extension_server::create_control_files(remote_extensions, &self.pgbin);
1488 :
1489 : let library_load_start_time = Utc::now();
1490 : let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
1491 :
1492 : let library_load_time = Utc::now()
1493 : .signed_duration_since(library_load_start_time)
1494 : .to_std()
1495 : .unwrap()
1496 : .as_millis() as u64;
1497 : let mut state = self.state.lock().unwrap();
1498 : state.metrics.load_ext_ms = library_load_time;
1499 : state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded;
1500 : state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size;
1501 : state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size;
1502 : info!(
1503 : "Loading shared_preload_libraries took {:?}ms",
1504 : library_load_time
1505 : );
1506 : info!("{:?}", remote_ext_metrics);
1507 : }
1508 :
1509 : self.prepare_pgdata(&compute_state)?;
1510 :
1511 : let start_time = Utc::now();
1512 : let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
1513 :
1514 : let config_time = Utc::now();
1515 : if pspec.spec.mode == ComputeMode::Primary {
1516 : if !pspec.spec.skip_pg_catalog_updates {
1517 : let pgdata_path = Path::new(&self.pgdata);
1518 : // temporarily reset max_cluster_size in config
1519 : // to avoid the possibility of hitting the limit, while we are applying config:
1520 : // creating new extensions, roles, etc...
1521 : config::with_compute_ctl_tmp_override(
1522 : pgdata_path,
1523 : "neon.max_cluster_size=-1",
1524 0 : || {
1525 0 : self.pg_reload_conf()?;
1526 :
1527 0 : self.apply_config(&compute_state)?;
1528 :
1529 0 : Ok(())
1530 0 : },
1531 : )?;
1532 :
1533 : let postgresql_conf_path = pgdata_path.join("postgresql.conf");
1534 : if config::line_in_file(
1535 : &postgresql_conf_path,
1536 : "neon.disable_logical_replication_subscribers=false",
1537 : )? {
1538 : info!("updated postgresql.conf to set neon.disable_logical_replication_subscribers=false");
1539 : }
1540 : self.pg_reload_conf()?;
1541 : }
1542 : self.post_apply_config()?;
1543 :
1544 : let conf = self.get_conn_conf(None);
1545 0 : thread::spawn(move || {
1546 0 : let res = get_installed_extensions(conf);
1547 0 : match res {
1548 0 : Ok(extensions) => {
1549 0 : info!(
1550 0 : "[NEON_EXT_STAT] {}",
1551 0 : serde_json::to_string(&extensions)
1552 0 : .expect("failed to serialize extensions list")
1553 : );
1554 : }
1555 0 : Err(err) => error!("could not get installed extensions: {err:?}"),
1556 : }
1557 0 : });
1558 : }
1559 :
1560 : let startup_end_time = Utc::now();
1561 : {
1562 : let mut state = self.state.lock().unwrap();
1563 : state.metrics.start_postgres_ms = config_time
1564 : .signed_duration_since(start_time)
1565 : .to_std()
1566 : .unwrap()
1567 : .as_millis() as u64;
1568 : state.metrics.config_ms = startup_end_time
1569 : .signed_duration_since(config_time)
1570 : .to_std()
1571 : .unwrap()
1572 : .as_millis() as u64;
1573 : state.metrics.total_startup_ms = startup_end_time
1574 : .signed_duration_since(compute_state.start_time)
1575 : .to_std()
1576 : .unwrap()
1577 : .as_millis() as u64;
1578 : }
1579 : self.set_status(ComputeStatus::Running);
1580 :
1581 : info!(
1582 : "finished configuration of compute for project {}",
1583 : pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
1584 : );
1585 :
1586 : // Log metrics so that we can search for slow operations in logs
1587 : let metrics = {
1588 : let state = self.state.lock().unwrap();
1589 : state.metrics.clone()
1590 : };
1591 : info!(?metrics, "compute start finished");
1592 :
1593 : Ok(pg_process)
1594 : }
1595 :
1596 : /// Update the `last_active` in the shared state, but ensure that it's a more recent one.
1597 0 : pub fn update_last_active(&self, last_active: Option<DateTime<Utc>>) {
1598 0 : let mut state = self.state.lock().unwrap();
1599 0 : // NB: `Some(<DateTime>)` is always greater than `None`.
1600 0 : if last_active > state.last_active {
1601 0 : state.last_active = last_active;
1602 0 : debug!("set the last compute activity time to: {:?}", last_active);
1603 0 : }
1604 0 : }
1605 :
1606 : // Look for core dumps and collect backtraces.
1607 : //
1608 : // EKS worker nodes have following core dump settings:
1609 : // /proc/sys/kernel/core_pattern -> core
1610 : // /proc/sys/kernel/core_uses_pid -> 1
1611 : // ulimit -c -> unlimited
1612 : // which results in core dumps being written to postgres data directory as core.<pid>.
1613 : //
1614 : // Use that as a default location and pattern, except macos where core dumps are written
1615 : // to /cores/ directory by default.
1616 : //
1617 : // With default Linux settings, the core dump file is called just "core", so check for
1618 : // that too.
1619 0 : pub fn check_for_core_dumps(&self) -> Result<()> {
1620 0 : let core_dump_dir = match std::env::consts::OS {
1621 0 : "macos" => Path::new("/cores/"),
1622 0 : _ => Path::new(&self.pgdata),
1623 : };
1624 :
1625 : // Collect core dump paths if any
1626 0 : info!("checking for core dumps in {}", core_dump_dir.display());
1627 0 : let files = fs::read_dir(core_dump_dir)?;
1628 0 : let cores = files.filter_map(|entry| {
1629 0 : let entry = entry.ok()?;
1630 :
1631 0 : let is_core_dump = match entry.file_name().to_str()? {
1632 0 : n if n.starts_with("core.") => true,
1633 0 : "core" => true,
1634 0 : _ => false,
1635 : };
1636 0 : if is_core_dump {
1637 0 : Some(entry.path())
1638 : } else {
1639 0 : None
1640 : }
1641 0 : });
1642 :
1643 : // Print backtrace for each core dump
1644 0 : for core_path in cores {
1645 0 : warn!(
1646 0 : "core dump found: {}, collecting backtrace",
1647 0 : core_path.display()
1648 : );
1649 :
1650 : // Try first with gdb
1651 0 : let backtrace = Command::new("gdb")
1652 0 : .args(["--batch", "-q", "-ex", "bt", &self.pgbin])
1653 0 : .arg(&core_path)
1654 0 : .output();
1655 :
1656 : // Try lldb if no gdb is found -- that is handy for local testing on macOS
1657 0 : let backtrace = match backtrace {
1658 0 : Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {
1659 0 : warn!("cannot find gdb, trying lldb");
1660 0 : Command::new("lldb")
1661 0 : .arg("-c")
1662 0 : .arg(&core_path)
1663 0 : .args(["--batch", "-o", "bt all", "-o", "quit"])
1664 0 : .output()
1665 : }
1666 0 : _ => backtrace,
1667 0 : }?;
1668 :
1669 0 : warn!(
1670 0 : "core dump backtrace: {}",
1671 0 : String::from_utf8_lossy(&backtrace.stdout)
1672 : );
1673 0 : warn!(
1674 0 : "debugger stderr: {}",
1675 0 : String::from_utf8_lossy(&backtrace.stderr)
1676 : );
1677 : }
1678 :
1679 0 : Ok(())
1680 0 : }
1681 :
1682 : /// Select `pg_stat_statements` data and return it as a stringified JSON
1683 0 : pub async fn collect_insights(&self) -> String {
1684 0 : let mut result_rows: Vec<String> = Vec::new();
1685 0 : let conf = self.get_tokio_conn_conf(Some("compute_ctl:collect_insights"));
1686 0 : let connect_result = conf.connect(NoTls).await;
1687 0 : let (client, connection) = connect_result.unwrap();
1688 0 : tokio::spawn(async move {
1689 0 : if let Err(e) = connection.await {
1690 0 : eprintln!("connection error: {}", e);
1691 0 : }
1692 0 : });
1693 0 : let result = client
1694 0 : .simple_query(
1695 0 : "SELECT
1696 0 : row_to_json(pg_stat_statements)
1697 0 : FROM
1698 0 : pg_stat_statements
1699 0 : WHERE
1700 0 : userid != 'cloud_admin'::regrole::oid
1701 0 : ORDER BY
1702 0 : (mean_exec_time + mean_plan_time) DESC
1703 0 : LIMIT 100",
1704 0 : )
1705 0 : .await;
1706 :
1707 0 : if let Ok(raw_rows) = result {
1708 0 : for message in raw_rows.iter() {
1709 0 : if let postgres::SimpleQueryMessage::Row(row) = message {
1710 0 : if let Some(json) = row.get(0) {
1711 0 : result_rows.push(json.to_string());
1712 0 : }
1713 0 : }
1714 : }
1715 :
1716 0 : format!("{{\"pg_stat_statements\": [{}]}}", result_rows.join(","))
1717 : } else {
1718 0 : "{{\"pg_stat_statements\": []}}".to_string()
1719 : }
1720 0 : }
1721 :
1722 : // download an archive, unzip and place files in correct locations
1723 0 : pub async fn download_extension(
1724 0 : &self,
1725 0 : real_ext_name: String,
1726 0 : ext_path: RemotePath,
1727 0 : ) -> Result<u64, DownloadError> {
1728 0 : let ext_remote_storage =
1729 0 : self.ext_remote_storage
1730 0 : .as_ref()
1731 0 : .ok_or(DownloadError::BadInput(anyhow::anyhow!(
1732 0 : "Remote extensions storage is not configured",
1733 0 : )))?;
1734 :
1735 0 : let ext_archive_name = ext_path.object_name().expect("bad path");
1736 0 :
1737 0 : let mut first_try = false;
1738 0 : if !self
1739 0 : .ext_download_progress
1740 0 : .read()
1741 0 : .expect("lock err")
1742 0 : .contains_key(ext_archive_name)
1743 0 : {
1744 0 : self.ext_download_progress
1745 0 : .write()
1746 0 : .expect("lock err")
1747 0 : .insert(ext_archive_name.to_string(), (Utc::now(), false));
1748 0 : first_try = true;
1749 0 : }
1750 0 : let (download_start, download_completed) =
1751 0 : self.ext_download_progress.read().expect("lock err")[ext_archive_name];
1752 0 : let start_time_delta = Utc::now()
1753 0 : .signed_duration_since(download_start)
1754 0 : .to_std()
1755 0 : .unwrap()
1756 0 : .as_millis() as u64;
1757 :
1758 : // how long to wait for extension download if it was started by another process
1759 : const HANG_TIMEOUT: u64 = 3000; // milliseconds
1760 :
1761 0 : if download_completed {
1762 0 : info!("extension already downloaded, skipping re-download");
1763 0 : return Ok(0);
1764 0 : } else if start_time_delta < HANG_TIMEOUT && !first_try {
1765 0 : info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout");
1766 0 : let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
1767 : loop {
1768 0 : info!("waiting for download");
1769 0 : interval.tick().await;
1770 0 : let (_, download_completed_now) =
1771 0 : self.ext_download_progress.read().expect("lock")[ext_archive_name];
1772 0 : if download_completed_now {
1773 0 : info!("download finished by whoever else downloaded it");
1774 0 : return Ok(0);
1775 0 : }
1776 : }
1777 : // NOTE: the above loop will get terminated
1778 : // based on the timeout of the download function
1779 0 : }
1780 0 :
1781 0 : // if extension hasn't been downloaded before or the previous
1782 0 : // attempt to download was at least HANG_TIMEOUT ms ago
1783 0 : // then we try to download it here
1784 0 : info!("downloading new extension {ext_archive_name}");
1785 :
1786 0 : let download_size = extension_server::download_extension(
1787 0 : &real_ext_name,
1788 0 : &ext_path,
1789 0 : ext_remote_storage,
1790 0 : &self.pgbin,
1791 0 : )
1792 0 : .await
1793 0 : .map_err(DownloadError::Other);
1794 0 :
1795 0 : if download_size.is_ok() {
1796 0 : self.ext_download_progress
1797 0 : .write()
1798 0 : .expect("bad lock")
1799 0 : .insert(ext_archive_name.to_string(), (download_start, true));
1800 0 : }
1801 :
1802 0 : download_size
1803 0 : }
1804 :
1805 0 : pub async fn set_role_grants(
1806 0 : &self,
1807 0 : db_name: &PgIdent,
1808 0 : schema_name: &PgIdent,
1809 0 : privileges: &[Privilege],
1810 0 : role_name: &PgIdent,
1811 0 : ) -> Result<()> {
1812 : use tokio_postgres::NoTls;
1813 :
1814 0 : let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:set_role_grants"));
1815 0 : conf.dbname(db_name);
1816 :
1817 0 : let (db_client, conn) = conf
1818 0 : .connect(NoTls)
1819 0 : .await
1820 0 : .context("Failed to connect to the database")?;
1821 0 : tokio::spawn(conn);
1822 0 :
1823 0 : // TODO: support other types of grants apart from schemas?
1824 0 : let query = format!(
1825 0 : "GRANT {} ON SCHEMA {} TO {}",
1826 0 : privileges
1827 0 : .iter()
1828 0 : // should not be quoted as it's part of the command.
1829 0 : // is already sanitized so it's ok
1830 0 : .map(|p| p.as_str())
1831 0 : .collect::<Vec<&'static str>>()
1832 0 : .join(", "),
1833 0 : // quote the schema and role name as identifiers to sanitize them.
1834 0 : schema_name.pg_quote(),
1835 0 : role_name.pg_quote(),
1836 0 : );
1837 0 : db_client
1838 0 : .simple_query(&query)
1839 0 : .await
1840 0 : .with_context(|| format!("Failed to execute query: {}", query))?;
1841 :
1842 0 : Ok(())
1843 0 : }
1844 :
1845 0 : pub async fn install_extension(
1846 0 : &self,
1847 0 : ext_name: &PgIdent,
1848 0 : db_name: &PgIdent,
1849 0 : ext_version: ExtVersion,
1850 0 : ) -> Result<ExtVersion> {
1851 : use tokio_postgres::NoTls;
1852 :
1853 0 : let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:install_extension"));
1854 0 : conf.dbname(db_name);
1855 :
1856 0 : let (db_client, conn) = conf
1857 0 : .connect(NoTls)
1858 0 : .await
1859 0 : .context("Failed to connect to the database")?;
1860 0 : tokio::spawn(conn);
1861 0 :
1862 0 : let version_query = "SELECT extversion FROM pg_extension WHERE extname = $1";
1863 0 : let version: Option<ExtVersion> = db_client
1864 0 : .query_opt(version_query, &[&ext_name])
1865 0 : .await
1866 0 : .with_context(|| format!("Failed to execute query: {}", version_query))?
1867 0 : .map(|row| row.get(0));
1868 0 :
1869 0 : // sanitize the inputs as postgres idents.
1870 0 : let ext_name: String = ext_name.pg_quote();
1871 0 : let quoted_version: String = ext_version.pg_quote();
1872 :
1873 0 : if let Some(installed_version) = version {
1874 0 : if installed_version == ext_version {
1875 0 : return Ok(installed_version);
1876 0 : }
1877 0 : let query = format!("ALTER EXTENSION {ext_name} UPDATE TO {quoted_version}");
1878 0 : db_client
1879 0 : .simple_query(&query)
1880 0 : .await
1881 0 : .with_context(|| format!("Failed to execute query: {}", query))?;
1882 : } else {
1883 0 : let query =
1884 0 : format!("CREATE EXTENSION IF NOT EXISTS {ext_name} WITH VERSION {quoted_version}");
1885 0 : db_client
1886 0 : .simple_query(&query)
1887 0 : .await
1888 0 : .with_context(|| format!("Failed to execute query: {}", query))?;
1889 : }
1890 :
1891 0 : Ok(ext_version)
1892 0 : }
1893 :
1894 : #[tokio::main]
1895 0 : pub async fn prepare_preload_libraries(
1896 0 : &self,
1897 0 : spec: &ComputeSpec,
1898 0 : ) -> Result<RemoteExtensionMetrics> {
1899 0 : if self.ext_remote_storage.is_none() {
1900 0 : return Ok(RemoteExtensionMetrics {
1901 0 : num_ext_downloaded: 0,
1902 0 : largest_ext_size: 0,
1903 0 : total_ext_download_size: 0,
1904 0 : });
1905 0 : }
1906 0 : let remote_extensions = spec
1907 0 : .remote_extensions
1908 0 : .as_ref()
1909 0 : .ok_or(anyhow::anyhow!("Remote extensions are not configured"))?;
1910 0 :
1911 0 : info!("parse shared_preload_libraries from spec.cluster.settings");
1912 0 : let mut libs_vec = Vec::new();
1913 0 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
1914 0 : libs_vec = libs
1915 0 : .split(&[',', '\'', ' '])
1916 0 : .filter(|s| *s != "neon" && !s.is_empty())
1917 0 : .map(str::to_string)
1918 0 : .collect();
1919 0 : }
1920 0 : info!("parse shared_preload_libraries from provided postgresql.conf");
1921 0 :
1922 0 : // that is used in neon_local and python tests
1923 0 : if let Some(conf) = &spec.cluster.postgresql_conf {
1924 0 : let conf_lines = conf.split('\n').collect::<Vec<&str>>();
1925 0 : let mut shared_preload_libraries_line = "";
1926 0 : for line in conf_lines {
1927 0 : if line.starts_with("shared_preload_libraries") {
1928 0 : shared_preload_libraries_line = line;
1929 0 : }
1930 0 : }
1931 0 : let mut preload_libs_vec = Vec::new();
1932 0 : if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
1933 0 : preload_libs_vec = libs
1934 0 : .split(&[',', '\'', ' '])
1935 0 : .filter(|s| *s != "neon" && !s.is_empty())
1936 0 : .map(str::to_string)
1937 0 : .collect();
1938 0 : }
1939 0 : libs_vec.extend(preload_libs_vec);
1940 0 : }
1941 0 :
1942 0 : // Don't try to download libraries that are not in the index.
1943 0 : // Assume that they are already present locally.
1944 0 : libs_vec.retain(|lib| remote_extensions.library_index.contains_key(lib));
1945 0 :
1946 0 : info!("Downloading to shared preload libraries: {:?}", &libs_vec);
1947 0 :
1948 0 : let mut download_tasks = Vec::new();
1949 0 : for library in &libs_vec {
1950 0 : let (ext_name, ext_path) =
1951 0 : remote_extensions.get_ext(library, true, &self.build_tag, &self.pgversion)?;
1952 0 : download_tasks.push(self.download_extension(ext_name, ext_path));
1953 0 : }
1954 0 : let results = join_all(download_tasks).await;
1955 0 :
1956 0 : let mut remote_ext_metrics = RemoteExtensionMetrics {
1957 0 : num_ext_downloaded: 0,
1958 0 : largest_ext_size: 0,
1959 0 : total_ext_download_size: 0,
1960 0 : };
1961 0 : for result in results {
1962 0 : let download_size = match result {
1963 0 : Ok(res) => {
1964 0 : remote_ext_metrics.num_ext_downloaded += 1;
1965 0 : res
1966 0 : }
1967 0 : Err(err) => {
1968 0 : // if we failed to download an extension, we don't want to fail the whole
1969 0 : // process, but we do want to log the error
1970 0 : error!("Failed to download extension: {}", err);
1971 0 : 0
1972 0 : }
1973 0 : };
1974 0 :
1975 0 : remote_ext_metrics.largest_ext_size =
1976 0 : std::cmp::max(remote_ext_metrics.largest_ext_size, download_size);
1977 0 : remote_ext_metrics.total_ext_download_size += download_size;
1978 0 : }
1979 0 : Ok(remote_ext_metrics)
1980 0 : }
1981 :
1982 : /// Waits until current thread receives a state changed notification and
1983 : /// the pageserver connection strings has changed.
1984 : ///
1985 : /// The operation will time out after a specified duration.
1986 0 : pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) {
1987 0 : let state = self.state.lock().unwrap();
1988 0 : let old_pageserver_connstr = state
1989 0 : .pspec
1990 0 : .as_ref()
1991 0 : .expect("spec must be set")
1992 0 : .pageserver_connstr
1993 0 : .clone();
1994 0 : let mut unchanged = true;
1995 0 : let _ = self
1996 0 : .state_changed
1997 0 : .wait_timeout_while(state, duration, |s| {
1998 0 : let pageserver_connstr = &s
1999 0 : .pspec
2000 0 : .as_ref()
2001 0 : .expect("spec must be set")
2002 0 : .pageserver_connstr;
2003 0 : unchanged = pageserver_connstr == &old_pageserver_connstr;
2004 0 : unchanged
2005 0 : })
2006 0 : .unwrap();
2007 0 : if !unchanged {
2008 0 : info!("Pageserver config changed");
2009 0 : }
2010 0 : }
2011 : }
2012 :
2013 0 : pub fn forward_termination_signal() {
2014 0 : let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst);
2015 0 : if ss_pid != 0 {
2016 0 : let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32);
2017 0 : kill(ss_pid, Signal::SIGTERM).ok();
2018 0 : }
2019 0 : let pg_pid = PG_PID.load(Ordering::SeqCst);
2020 0 : if pg_pid != 0 {
2021 0 : let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
2022 0 : // Use 'fast' shutdown (SIGINT) because it also creates a shutdown checkpoint, which is important for
2023 0 : // ROs to get a list of running xacts faster instead of going through the CLOG.
2024 0 : // See https://www.postgresql.org/docs/current/server-shutdown.html for the list of modes and signals.
2025 0 : kill(pg_pid, Signal::SIGINT).ok();
2026 0 : }
2027 0 : }
|