Line data Source code
1 : use std::collections::HashMap;
2 : use std::env;
3 : use std::fs;
4 : use std::io::BufRead;
5 : use std::os::unix::fs::PermissionsExt;
6 : use std::path::Path;
7 : use std::process::{Command, Stdio};
8 : use std::str::FromStr;
9 : use std::sync::atomic::AtomicU32;
10 : use std::sync::atomic::Ordering;
11 : use std::sync::{Condvar, Mutex, RwLock};
12 : use std::thread;
13 : use std::time::Instant;
14 :
15 : use anyhow::{Context, Result};
16 : use chrono::{DateTime, Utc};
17 : use futures::future::join_all;
18 : use futures::stream::FuturesUnordered;
19 : use futures::StreamExt;
20 : use postgres::{Client, NoTls};
21 : use tokio;
22 : use tokio_postgres;
23 : use tracing::{debug, error, info, instrument, warn};
24 : use utils::id::{TenantId, TimelineId};
25 : use utils::lsn::Lsn;
26 :
27 : use compute_api::responses::{ComputeMetrics, ComputeStatus};
28 : use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec};
29 : use utils::measured_stream::MeasuredReader;
30 :
31 : use remote_storage::{DownloadError, RemotePath};
32 :
33 : use crate::checker::create_availability_check_data;
34 : use crate::logger::inlinify;
35 : use crate::pg_helpers::*;
36 : use crate::spec::*;
37 : use crate::sync_sk::{check_if_synced, ping_safekeeper};
38 : use crate::{config, extension_server};
39 :
40 : pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
41 : pub static PG_PID: AtomicU32 = AtomicU32::new(0);
42 :
43 : /// Compute node info shared across several `compute_ctl` threads.
44 : pub struct ComputeNode {
45 : // Url type maintains proper escaping
46 : pub connstr: url::Url,
47 : pub pgdata: String,
48 : pub pgbin: String,
49 : pub pgversion: String,
50 : /// We should only allow live re- / configuration of the compute node if
51 : /// it uses 'pull model', i.e. it can go to control-plane and fetch
52 : /// the latest configuration. Otherwise, there could be a case:
53 : /// - we start compute with some spec provided as argument
54 : /// - we push new spec and it does reconfiguration
55 : /// - but then something happens and compute pod / VM is destroyed,
56 : /// so k8s controller starts it again with the **old** spec
57 : /// and the same for empty computes:
58 : /// - we started compute without any spec
59 : /// - we push spec and it does configuration
60 : /// - but then it is restarted without any spec again
61 : pub live_config_allowed: bool,
62 : /// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
63 : /// To allow HTTP API server to serving status requests, while configuration
64 : /// is in progress, lock should be held only for short periods of time to do
65 : /// read/write, not the whole configuration process.
66 : pub state: Mutex<ComputeState>,
67 : /// `Condvar` to allow notifying waiters about state changes.
68 : pub state_changed: Condvar,
69 : /// the address of extension storage proxy gateway
70 : pub ext_remote_storage: Option<String>,
71 : // key: ext_archive_name, value: started download time, download_completed?
72 : pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
73 : pub build_tag: String,
74 : }
75 :
76 : // store some metrics about download size that might impact startup time
77 2 : #[derive(Clone, Debug)]
78 : pub struct RemoteExtensionMetrics {
79 : num_ext_downloaded: u64,
80 : largest_ext_size: u64,
81 : total_ext_download_size: u64,
82 : }
83 :
84 1371 : #[derive(Clone, Debug)]
85 : pub struct ComputeState {
86 : pub start_time: DateTime<Utc>,
87 : pub status: ComputeStatus,
88 : /// Timestamp of the last Postgres activity. It could be `None` if
89 : /// compute wasn't used since start.
90 : pub last_active: Option<DateTime<Utc>>,
91 : pub error: Option<String>,
92 : pub pspec: Option<ParsedSpec>,
93 : pub metrics: ComputeMetrics,
94 : }
95 :
96 : impl ComputeState {
97 575 : pub fn new() -> Self {
98 575 : Self {
99 575 : start_time: Utc::now(),
100 575 : status: ComputeStatus::Empty,
101 575 : last_active: None,
102 575 : error: None,
103 575 : pspec: None,
104 575 : metrics: ComputeMetrics::default(),
105 575 : }
106 575 : }
107 : }
108 :
109 : impl Default for ComputeState {
110 0 : fn default() -> Self {
111 0 : Self::new()
112 0 : }
113 : }
114 :
115 2118 : #[derive(Clone, Debug)]
116 : pub struct ParsedSpec {
117 : pub spec: ComputeSpec,
118 : pub tenant_id: TenantId,
119 : pub timeline_id: TimelineId,
120 : pub pageserver_connstr: String,
121 : pub safekeeper_connstrings: Vec<String>,
122 : pub storage_auth_token: Option<String>,
123 : }
124 :
125 : impl TryFrom<ComputeSpec> for ParsedSpec {
126 : type Error = String;
127 796 : fn try_from(spec: ComputeSpec) -> Result<Self, String> {
128 : // Extract the options from the spec file that are needed to connect to
129 : // the storage system.
130 : //
131 : // For backwards-compatibility, the top-level fields in the spec file
132 : // may be empty. In that case, we need to dig them from the GUCs in the
133 : // cluster.settings field.
134 796 : let pageserver_connstr = spec
135 796 : .pageserver_connstring
136 796 : .clone()
137 796 : .or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
138 796 : .ok_or("pageserver connstr should be provided")?;
139 796 : let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
140 49 : if matches!(spec.mode, ComputeMode::Primary) {
141 0 : spec.cluster
142 0 : .settings
143 0 : .find("neon.safekeepers")
144 0 : .ok_or("safekeeper connstrings should be provided")?
145 0 : .split(',')
146 0 : .map(|str| str.to_string())
147 0 : .collect()
148 : } else {
149 49 : vec![]
150 : }
151 : } else {
152 747 : spec.safekeeper_connstrings.clone()
153 : };
154 796 : let storage_auth_token = spec.storage_auth_token.clone();
155 796 : let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
156 796 : tenant_id
157 : } else {
158 0 : spec.cluster
159 0 : .settings
160 0 : .find("neon.tenant_id")
161 0 : .ok_or("tenant id should be provided")
162 0 : .map(|s| TenantId::from_str(&s))?
163 0 : .or(Err("invalid tenant id"))?
164 : };
165 796 : let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
166 796 : timeline_id
167 : } else {
168 0 : spec.cluster
169 0 : .settings
170 0 : .find("neon.timeline_id")
171 0 : .ok_or("timeline id should be provided")
172 0 : .map(|s| TimelineId::from_str(&s))?
173 0 : .or(Err("invalid timeline id"))?
174 : };
175 :
176 796 : Ok(ParsedSpec {
177 796 : spec,
178 796 : pageserver_connstr,
179 796 : safekeeper_connstrings,
180 796 : storage_auth_token,
181 796 : tenant_id,
182 796 : timeline_id,
183 796 : })
184 796 : }
185 : }
186 :
187 : /// If we are a VM, returns a [`Command`] that will run in the `neon-postgres`
188 : /// cgroup. Otherwise returns the default `Command::new(cmd)`
189 : ///
190 : /// This function should be used to start postgres, as it will start it in the
191 : /// neon-postgres cgroup if we are a VM. This allows autoscaling to control
192 : /// postgres' resource usage. The cgroup will exist in VMs because vm-builder
193 : /// creates it during the sysinit phase of its inittab.
194 1506 : fn maybe_cgexec(cmd: &str) -> Command {
195 1506 : // The cplane sets this env var for autoscaling computes.
196 1506 : // use `var_os` so we don't have to worry about the variable being valid
197 1506 : // unicode. Should never be an concern . . . but just in case
198 1506 : if env::var_os("AUTOSCALING").is_some() {
199 0 : let mut command = Command::new("cgexec");
200 0 : command.args(["-g", "memory:neon-postgres"]);
201 0 : command.arg(cmd);
202 0 : command
203 : } else {
204 1506 : Command::new(cmd)
205 : }
206 1506 : }
207 :
208 : /// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
209 : /// that we give to customers
210 8 : #[instrument(skip_all)]
211 : fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
212 : let roles = spec
213 : .cluster
214 : .roles
215 : .iter()
216 0 : .map(|r| escape_literal(&r.name))
217 : .collect::<Vec<_>>();
218 :
219 : let dbs = spec
220 : .cluster
221 : .databases
222 : .iter()
223 0 : .map(|db| escape_literal(&db.name))
224 : .collect::<Vec<_>>();
225 :
226 : let roles_decl = if roles.is_empty() {
227 : String::from("roles text[] := NULL;")
228 : } else {
229 : format!(
230 : r#"
231 : roles text[] := ARRAY(SELECT rolname
232 : FROM pg_catalog.pg_roles
233 : WHERE rolname IN ({}));"#,
234 : roles.join(", ")
235 : )
236 : };
237 :
238 : let database_decl = if dbs.is_empty() {
239 : String::from("dbs text[] := NULL;")
240 : } else {
241 : format!(
242 : r#"
243 : dbs text[] := ARRAY(SELECT datname
244 : FROM pg_catalog.pg_database
245 : WHERE datname IN ({}));"#,
246 : dbs.join(", ")
247 : )
248 : };
249 :
250 : // ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases
251 : // (see https://www.postgresql.org/docs/current/ddl-priv.html)
252 : let query = format!(
253 : r#"
254 : DO $$
255 : DECLARE
256 : r text;
257 : {}
258 : {}
259 : BEGIN
260 : IF NOT EXISTS (
261 : SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
262 : THEN
263 : CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
264 : IF array_length(roles, 1) IS NOT NULL THEN
265 : EXECUTE format('GRANT neon_superuser TO %s',
266 : array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
267 : FOREACH r IN ARRAY roles LOOP
268 : EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r));
269 : END LOOP;
270 : END IF;
271 : IF array_length(dbs, 1) IS NOT NULL THEN
272 : EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser',
273 : array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', '));
274 : END IF;
275 : END IF;
276 : END
277 : $$;"#,
278 : roles_decl, database_decl,
279 : );
280 8 : info!("Neon superuser created: {}", inlinify(&query));
281 : client
282 : .simple_query(&query)
283 0 : .map_err(|e| anyhow::anyhow!(e).context(query))?;
284 : Ok(())
285 : }
286 :
287 : impl ComputeNode {
288 : /// Check that compute node has corresponding feature enabled.
289 12552 : pub fn has_feature(&self, feature: ComputeFeature) -> bool {
290 12552 : let state = self.state.lock().unwrap();
291 :
292 12552 : if let Some(s) = state.pspec.as_ref() {
293 12552 : s.spec.features.contains(&feature)
294 : } else {
295 0 : false
296 : }
297 12552 : }
298 :
299 796 : pub fn set_status(&self, status: ComputeStatus) {
300 796 : let mut state = self.state.lock().unwrap();
301 796 : state.status = status;
302 796 : self.state_changed.notify_all();
303 796 : }
304 :
305 0 : pub fn get_status(&self) -> ComputeStatus {
306 0 : self.state.lock().unwrap().status
307 0 : }
308 :
309 : // Remove `pgdata` directory and create it again with right permissions.
310 575 : fn create_pgdata(&self) -> Result<()> {
311 575 : // Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
312 575 : // If it is something different then create_dir() will error out anyway.
313 575 : let _ok = fs::remove_dir_all(&self.pgdata);
314 575 : fs::create_dir(&self.pgdata)?;
315 575 : fs::set_permissions(&self.pgdata, fs::Permissions::from_mode(0o700))?;
316 :
317 575 : Ok(())
318 575 : }
319 :
320 : // Get basebackup from the libpq connection to pageserver using `connstr` and
321 : // unarchive it to `pgdata` directory overriding all its previous content.
322 575 : #[instrument(skip_all, fields(%lsn))]
323 : fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
324 : let spec = compute_state.pspec.as_ref().expect("spec must be set");
325 : let start_time = Instant::now();
326 :
327 : let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
328 :
329 : // Use the storage auth token from the config file, if given.
330 : // Note: this overrides any password set in the connection string.
331 : if let Some(storage_auth_token) = &spec.storage_auth_token {
332 15 : info!("Got storage auth token from spec file");
333 : config.password(storage_auth_token);
334 : } else {
335 560 : info!("Storage auth token not set");
336 : }
337 :
338 : // Connect to pageserver
339 : let mut client = config.connect(NoTls)?;
340 : let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
341 :
342 : let basebackup_cmd = match lsn {
343 : // HACK We don't use compression on first start (Lsn(0)) because there's no API for it
344 : Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id),
345 : _ => format!(
346 : "basebackup {} {} {} --gzip",
347 : spec.tenant_id, spec.timeline_id, lsn
348 : ),
349 : };
350 :
351 : let copyreader = client.copy_out(basebackup_cmd.as_str())?;
352 : let mut measured_reader = MeasuredReader::new(copyreader);
353 :
354 : // Check the magic number to see if it's a gzip or not. Even though
355 : // we might explicitly ask for gzip, an old pageserver with no implementation
356 : // of gzip compression might send us uncompressed data. After some time
357 : // passes we can assume all pageservers know how to compress and we can
358 : // delete this check.
359 : //
360 : // If the data is not gzip, it will be tar. It will not be mistakenly
361 : // recognized as gzip because tar starts with an ascii encoding of a filename,
362 : // and 0x1f and 0x8b are unlikely first characters for any filename. Moreover,
363 : // we send the "global" directory first from the pageserver, so it definitely
364 : // won't be recognized as gzip.
365 : let mut bufreader = std::io::BufReader::new(&mut measured_reader);
366 : let gzip = {
367 : let peek = bufreader.fill_buf().unwrap();
368 : peek[0] == 0x1f && peek[1] == 0x8b
369 : };
370 :
371 : // Read the archive directly from the `CopyOutReader`
372 : //
373 : // Set `ignore_zeros` so that unpack() reads all the Copy data and
374 : // doesn't stop at the end-of-archive marker. Otherwise, if the server
375 : // sends an Error after finishing the tarball, we will not notice it.
376 : if gzip {
377 : let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut bufreader));
378 : ar.set_ignore_zeros(true);
379 : ar.unpack(&self.pgdata)?;
380 : } else {
381 : let mut ar = tar::Archive::new(&mut bufreader);
382 : ar.set_ignore_zeros(true);
383 : ar.unpack(&self.pgdata)?;
384 : };
385 :
386 : // Report metrics
387 : let mut state = self.state.lock().unwrap();
388 : state.metrics.pageserver_connect_micros = pageserver_connect_micros;
389 : state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
390 : state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
391 : Ok(())
392 : }
393 :
394 : // Gets the basebackup in a retry loop
395 575 : #[instrument(skip_all, fields(%lsn))]
396 : pub fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
397 : let mut retry_period_ms = 500;
398 : let mut attempts = 0;
399 : let max_attempts = 5;
400 : loop {
401 : let result = self.try_get_basebackup(compute_state, lsn);
402 : match result {
403 : Ok(_) => {
404 : return result;
405 : }
406 : Err(ref e) if attempts < max_attempts => {
407 0 : warn!(
408 0 : "Failed to get basebackup: {} (attempt {}/{})",
409 0 : e, attempts, max_attempts
410 0 : );
411 : std::thread::sleep(std::time::Duration::from_millis(retry_period_ms));
412 : retry_period_ms *= 2;
413 : }
414 : Err(_) => {
415 : return result;
416 : }
417 : }
418 : attempts += 1;
419 : }
420 : }
421 :
422 526 : pub async fn check_safekeepers_synced_async(
423 526 : &self,
424 526 : compute_state: &ComputeState,
425 526 : ) -> Result<Option<Lsn>> {
426 526 : // Construct a connection config for each safekeeper
427 526 : let pspec: ParsedSpec = compute_state
428 526 : .pspec
429 526 : .as_ref()
430 526 : .expect("spec must be set")
431 526 : .clone();
432 526 : let sk_connstrs: Vec<String> = pspec.safekeeper_connstrings.clone();
433 667 : let sk_configs = sk_connstrs.into_iter().map(|connstr| {
434 667 : // Format connstr
435 667 : let id = connstr.clone();
436 667 : let connstr = format!("postgresql://no_user@{}", connstr);
437 667 : let options = format!(
438 667 : "-c timeline_id={} tenant_id={}",
439 667 : pspec.timeline_id, pspec.tenant_id
440 667 : );
441 667 :
442 667 : // Construct client
443 667 : let mut config = tokio_postgres::Config::from_str(&connstr).unwrap();
444 667 : config.options(&options);
445 667 : if let Some(storage_auth_token) = pspec.storage_auth_token.clone() {
446 26 : config.password(storage_auth_token);
447 641 : }
448 :
449 667 : (id, config)
450 667 : });
451 526 :
452 526 : // Create task set to query all safekeepers
453 526 : let mut tasks = FuturesUnordered::new();
454 526 : let quorum = sk_configs.len() / 2 + 1;
455 1193 : for (id, config) in sk_configs {
456 667 : let timeout = tokio::time::Duration::from_millis(100);
457 667 : let task = tokio::time::timeout(timeout, ping_safekeeper(id, config));
458 667 : tasks.push(tokio::spawn(task));
459 667 : }
460 :
461 : // Get a quorum of responses or errors
462 526 : let mut responses = Vec::new();
463 526 : let mut join_errors = Vec::new();
464 526 : let mut task_errors = Vec::new();
465 526 : let mut timeout_errors = Vec::new();
466 1120 : while let Some(response) = tasks.next().await {
467 609 : match response {
468 598 : Ok(Ok(Ok(r))) => responses.push(r),
469 11 : Ok(Ok(Err(e))) => task_errors.push(e),
470 0 : Ok(Err(e)) => timeout_errors.push(e),
471 0 : Err(e) => join_errors.push(e),
472 : };
473 609 : if responses.len() >= quorum {
474 526 : break;
475 83 : }
476 83 : if join_errors.len() + task_errors.len() + timeout_errors.len() >= quorum {
477 0 : break;
478 83 : }
479 : }
480 :
481 : // In case of error, log and fail the check, but don't crash.
482 : // We're playing it safe because these errors could be transient
483 : // and we don't yet retry. Also being careful here allows us to
484 : // be backwards compatible with safekeepers that don't have the
485 : // TIMELINE_STATUS API yet.
486 526 : if responses.len() < quorum {
487 0 : error!(
488 0 : "failed sync safekeepers check {:?} {:?} {:?}",
489 0 : join_errors, task_errors, timeout_errors
490 0 : );
491 0 : return Ok(None);
492 526 : }
493 526 :
494 526 : Ok(check_if_synced(responses))
495 526 : }
496 :
497 : // Fast path for sync_safekeepers. If they're already synced we get the lsn
498 : // in one roundtrip. If not, we should do a full sync_safekeepers.
499 526 : pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
500 526 : let start_time = Utc::now();
501 526 :
502 526 : // Run actual work with new tokio runtime
503 526 : let rt = tokio::runtime::Builder::new_current_thread()
504 526 : .enable_all()
505 526 : .build()
506 526 : .expect("failed to create rt");
507 526 : let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
508 526 :
509 526 : // Record runtime
510 526 : self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
511 526 : .signed_duration_since(start_time)
512 526 : .to_std()
513 526 : .unwrap()
514 526 : .as_millis() as u64;
515 526 : result
516 526 : }
517 :
518 : // Run `postgres` in a special mode with `--sync-safekeepers` argument
519 : // and return the reported LSN back to the caller.
520 931 : #[instrument(skip_all)]
521 : pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
522 : let start_time = Utc::now();
523 :
524 : let mut sync_handle = maybe_cgexec(&self.pgbin)
525 : .args(["--sync-safekeepers"])
526 : .env("PGDATA", &self.pgdata) // we cannot use -D in this mode
527 : .envs(if let Some(storage_auth_token) = &storage_auth_token {
528 : vec![("NEON_AUTH_TOKEN", storage_auth_token)]
529 : } else {
530 : vec![]
531 : })
532 : .stdout(Stdio::piped())
533 : .stderr(Stdio::piped())
534 : .spawn()
535 : .expect("postgres --sync-safekeepers failed to start");
536 : SYNC_SAFEKEEPERS_PID.store(sync_handle.id(), Ordering::SeqCst);
537 :
538 : // `postgres --sync-safekeepers` will print all log output to stderr and
539 : // final LSN to stdout. So we leave stdout to collect LSN, while stderr logs
540 : // will be collected in a child thread.
541 : let stderr = sync_handle
542 : .stderr
543 : .take()
544 : .expect("stderr should be captured");
545 : let logs_handle = handle_postgres_logs(stderr);
546 :
547 : let sync_output = sync_handle
548 : .wait_with_output()
549 : .expect("postgres --sync-safekeepers failed");
550 : SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst);
551 :
552 : // Process has exited, so we can join the logs thread.
553 : let _ = logs_handle
554 : .join()
555 0 : .map_err(|e| tracing::error!("log thread panicked: {:?}", e));
556 :
557 : if !sync_output.status.success() {
558 : anyhow::bail!(
559 : "postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
560 : sync_output.status,
561 : String::from_utf8(sync_output.stdout)
562 : .expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
563 : );
564 : }
565 :
566 : self.state.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now()
567 : .signed_duration_since(start_time)
568 : .to_std()
569 : .unwrap()
570 : .as_millis() as u64;
571 :
572 : let lsn = Lsn::from_str(String::from_utf8(sync_output.stdout)?.trim())?;
573 :
574 : Ok(lsn)
575 : }
576 :
577 : /// Do all the preparations like PGDATA directory creation, configuration,
578 : /// safekeepers sync, basebackup, etc.
579 575 : #[instrument(skip_all)]
580 : pub fn prepare_pgdata(
581 : &self,
582 : compute_state: &ComputeState,
583 : extension_server_port: u16,
584 : ) -> Result<()> {
585 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
586 : let spec = &pspec.spec;
587 : let pgdata_path = Path::new(&self.pgdata);
588 :
589 : // Remove/create an empty pgdata directory and put configuration there.
590 : self.create_pgdata()?;
591 : config::write_postgres_conf(
592 : &pgdata_path.join("postgresql.conf"),
593 : &pspec.spec,
594 : Some(extension_server_port),
595 : )?;
596 :
597 : // Syncing safekeepers is only safe with primary nodes: if a primary
598 : // is already connected it will be kicked out, so a secondary (standby)
599 : // cannot sync safekeepers.
600 : let lsn = match spec.mode {
601 : ComputeMode::Primary => {
602 526 : info!("checking if safekeepers are synced");
603 : let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
604 : lsn
605 : } else {
606 405 : info!("starting safekeepers syncing");
607 : self.sync_safekeepers(pspec.storage_auth_token.clone())
608 0 : .with_context(|| "failed to sync safekeepers")?
609 : };
610 526 : info!("safekeepers synced at LSN {}", lsn);
611 : lsn
612 : }
613 : ComputeMode::Static(lsn) => {
614 44 : info!("Starting read-only node at static LSN {}", lsn);
615 : lsn
616 : }
617 : ComputeMode::Replica => {
618 5 : info!("Initializing standby from latest Pageserver LSN");
619 : Lsn(0)
620 : }
621 : };
622 :
623 575 : info!(
624 575 : "getting basebackup@{} from pageserver {}",
625 575 : lsn, &pspec.pageserver_connstr
626 575 : );
627 0 : self.get_basebackup(compute_state, lsn).with_context(|| {
628 0 : format!(
629 0 : "failed to get basebackup@{} from pageserver {}",
630 0 : lsn, &pspec.pageserver_connstr
631 0 : )
632 0 : })?;
633 :
634 : // Update pg_hba.conf received with basebackup.
635 : update_pg_hba(pgdata_path)?;
636 :
637 : match spec.mode {
638 : ComputeMode::Primary => {}
639 : ComputeMode::Replica | ComputeMode::Static(..) => {
640 : add_standby_signal(pgdata_path)?;
641 : }
642 : }
643 :
644 : Ok(())
645 : }
646 :
647 : /// Start and stop a postgres process to warm up the VM for startup.
648 0 : pub fn prewarm_postgres(&self) -> Result<()> {
649 0 : info!("prewarming");
650 :
651 : // Create pgdata
652 0 : let pgdata = &format!("{}.warmup", self.pgdata);
653 0 : create_pgdata(pgdata)?;
654 :
655 : // Run initdb to completion
656 0 : info!("running initdb");
657 0 : let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
658 0 : Command::new(initdb_bin)
659 0 : .args(["-D", pgdata])
660 0 : .output()
661 0 : .expect("cannot start initdb process");
662 0 :
663 0 : // Write conf
664 0 : use std::io::Write;
665 0 : let conf_path = Path::new(pgdata).join("postgresql.conf");
666 0 : let mut file = std::fs::File::create(conf_path)?;
667 0 : writeln!(file, "shared_buffers=65536")?;
668 0 : writeln!(file, "port=51055")?; // Nobody should be connecting
669 0 : writeln!(file, "shared_preload_libraries = 'neon'")?;
670 :
671 : // Start postgres
672 0 : info!("starting postgres");
673 0 : let mut pg = maybe_cgexec(&self.pgbin)
674 0 : .args(["-D", pgdata])
675 0 : .spawn()
676 0 : .expect("cannot start postgres process");
677 0 :
678 0 : // Stop it when it's ready
679 0 : info!("waiting for postgres");
680 0 : wait_for_postgres(&mut pg, Path::new(pgdata))?;
681 0 : pg.kill()?;
682 0 : info!("sent kill signal");
683 0 : pg.wait()?;
684 0 : info!("done prewarming");
685 :
686 : // clean up
687 0 : let _ok = fs::remove_dir_all(pgdata);
688 0 : Ok(())
689 0 : }
690 :
691 : /// Start Postgres as a child process and manage DBs/roles.
692 : /// After that this will hang waiting on the postmaster process to exit.
693 : /// Returns a handle to the child process and a handle to the logs thread.
694 575 : #[instrument(skip_all)]
695 : pub fn start_postgres(
696 : &self,
697 : storage_auth_token: Option<String>,
698 : ) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
699 : let pgdata_path = Path::new(&self.pgdata);
700 :
701 : // Run postgres as a child process.
702 : let mut pg = maybe_cgexec(&self.pgbin)
703 : .args(["-D", &self.pgdata])
704 : .envs(if let Some(storage_auth_token) = &storage_auth_token {
705 : vec![("NEON_AUTH_TOKEN", storage_auth_token)]
706 : } else {
707 : vec![]
708 : })
709 : .stderr(Stdio::piped())
710 : .spawn()
711 : .expect("cannot start postgres process");
712 : PG_PID.store(pg.id(), Ordering::SeqCst);
713 :
714 : // Start a thread to collect logs from stderr.
715 : let stderr = pg.stderr.take().expect("stderr should be captured");
716 : let logs_handle = handle_postgres_logs(stderr);
717 :
718 : wait_for_postgres(&mut pg, pgdata_path)?;
719 :
720 : Ok((pg, logs_handle))
721 : }
722 :
723 : /// Do initial configuration of the already started Postgres.
724 8 : #[instrument(skip_all)]
725 : pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
726 : // If connection fails,
727 : // it may be the old node with `zenith_admin` superuser.
728 : //
729 : // In this case we need to connect with old `zenith_admin` name
730 : // and create new user. We cannot simply rename connected user,
731 : // but we can create a new one and grant it all privileges.
732 : let connstr = self.connstr.clone();
733 : let mut client = match Client::connect(connstr.as_str(), NoTls) {
734 : Err(e) => {
735 0 : info!(
736 0 : "cannot connect to postgres: {}, retrying with `zenith_admin` username",
737 0 : e
738 0 : );
739 : let mut zenith_admin_connstr = connstr.clone();
740 :
741 : zenith_admin_connstr
742 : .set_username("zenith_admin")
743 0 : .map_err(|_| anyhow::anyhow!("invalid connstr"))?;
744 :
745 : let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
746 : // Disable forwarding so that users don't get a cloud_admin role
747 : client.simple_query("SET neon.forward_ddl = false")?;
748 : client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
749 : client.simple_query("GRANT zenith_admin TO cloud_admin")?;
750 : drop(client);
751 :
752 : // reconnect with connstring with expected name
753 : Client::connect(connstr.as_str(), NoTls)?
754 : }
755 : Ok(client) => client,
756 : };
757 :
758 : // Disable DDL forwarding because control plane already knows about these roles/databases.
759 : client.simple_query("SET neon.forward_ddl = false")?;
760 :
761 : // Proceed with post-startup configuration. Note, that order of operations is important.
762 : let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
763 : create_neon_superuser(spec, &mut client)?;
764 : cleanup_instance(&mut client)?;
765 : handle_roles(spec, &mut client)?;
766 : handle_databases(spec, &mut client)?;
767 : handle_role_deletions(spec, connstr.as_str(), &mut client)?;
768 : handle_grants(spec, &mut client, connstr.as_str())?;
769 : handle_extensions(spec, &mut client)?;
770 : handle_extension_neon(&mut client)?;
771 : create_availability_check_data(&mut client)?;
772 :
773 : // 'Close' connection
774 : drop(client);
775 :
776 : if self.has_feature(ComputeFeature::Migrations) {
777 4 : thread::spawn(move || {
778 4 : let mut client = Client::connect(connstr.as_str(), NoTls)?;
779 4 : handle_migrations(&mut client)
780 4 : });
781 : }
782 : Ok(())
783 : }
784 :
785 : // We could've wrapped this around `pg_ctl reload`, but right now we don't use
786 : // `pg_ctl` for start / stop, so this just seems much easier to do as we already
787 : // have opened connection to Postgres and superuser access.
788 458 : #[instrument(skip_all)]
789 : fn pg_reload_conf(&self) -> Result<()> {
790 : let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
791 : Command::new(pgctl_bin)
792 : .args(["reload", "-D", &self.pgdata])
793 : .output()
794 : .expect("cannot run pg_ctl process");
795 : Ok(())
796 : }
797 :
798 : /// Similar to `apply_config()`, but does a bit different sequence of operations,
799 : /// as it's used to reconfigure a previously started and configured Postgres node.
800 221 : #[instrument(skip_all)]
801 : pub fn reconfigure(&self) -> Result<()> {
802 : let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
803 :
804 : if let Some(ref pgbouncer_settings) = spec.pgbouncer_settings {
805 0 : info!("tuning pgbouncer");
806 :
807 : let rt = tokio::runtime::Builder::new_current_thread()
808 : .enable_all()
809 : .build()
810 : .expect("failed to create rt");
811 :
812 : // Spawn a thread to do the tuning,
813 : // so that we don't block the main thread that starts Postgres.
814 : let pgbouncer_settings = pgbouncer_settings.clone();
815 0 : let _handle = thread::spawn(move || {
816 0 : let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
817 0 : if let Err(err) = res {
818 0 : error!("error while tuning pgbouncer: {err:?}");
819 0 : }
820 0 : });
821 : }
822 :
823 : // Write new config
824 : let pgdata_path = Path::new(&self.pgdata);
825 : let postgresql_conf_path = pgdata_path.join("postgresql.conf");
826 : config::write_postgres_conf(&postgresql_conf_path, &spec, None)?;
827 : // temporarily reset max_cluster_size in config
828 : // to avoid the possibility of hitting the limit, while we are reconfiguring:
829 : // creating new extensions, roles, etc...
830 : config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
831 : self.pg_reload_conf()?;
832 :
833 : let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
834 :
835 : // Proceed with post-startup configuration. Note, that order of operations is important.
836 : // Disable DDL forwarding because control plane already knows about these roles/databases.
837 : if spec.mode == ComputeMode::Primary {
838 : client.simple_query("SET neon.forward_ddl = false")?;
839 : cleanup_instance(&mut client)?;
840 : handle_roles(&spec, &mut client)?;
841 : handle_databases(&spec, &mut client)?;
842 : handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
843 : handle_grants(&spec, &mut client, self.connstr.as_str())?;
844 : handle_extensions(&spec, &mut client)?;
845 : handle_extension_neon(&mut client)?;
846 : // We can skip handle_migrations here because a new migration can only appear
847 : // if we have a new version of the compute_ctl binary, which can only happen
848 : // if compute got restarted, in which case we'll end up inside of apply_config
849 : // instead of reconfigure.
850 : }
851 :
852 : // 'Close' connection
853 : drop(client);
854 :
855 : // reset max_cluster_size in config back to original value and reload config
856 : config::compute_ctl_temp_override_remove(pgdata_path)?;
857 : self.pg_reload_conf()?;
858 :
859 : let unknown_op = "unknown".to_string();
860 : let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
861 221 : info!(
862 221 : "finished reconfiguration of compute node for operation {}",
863 221 : op_id
864 221 : );
865 :
866 : Ok(())
867 : }
868 :
869 575 : #[instrument(skip_all)]
870 : pub fn start_compute(
871 : &self,
872 : extension_server_port: u16,
873 : ) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
874 : let compute_state = self.state.lock().unwrap().clone();
875 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
876 575 : info!(
877 575 : "starting compute for project {}, operation {}, tenant {}, timeline {}",
878 575 : pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
879 575 : pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
880 575 : pspec.tenant_id,
881 575 : pspec.timeline_id,
882 575 : );
883 :
884 : // tune pgbouncer
885 : if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
886 0 : info!("tuning pgbouncer");
887 :
888 : let rt = tokio::runtime::Builder::new_current_thread()
889 : .enable_all()
890 : .build()
891 : .expect("failed to create rt");
892 :
893 : // Spawn a thread to do the tuning,
894 : // so that we don't block the main thread that starts Postgres.
895 : let pgbouncer_settings = pgbouncer_settings.clone();
896 0 : let _handle = thread::spawn(move || {
897 0 : let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
898 0 : if let Err(err) = res {
899 0 : error!("error while tuning pgbouncer: {err:?}");
900 0 : }
901 0 : });
902 : }
903 :
904 575 : info!(
905 575 : "start_compute spec.remote_extensions {:?}",
906 575 : pspec.spec.remote_extensions
907 575 : );
908 :
909 : // This part is sync, because we need to download
910 : // remote shared_preload_libraries before postgres start (if any)
911 : if let Some(remote_extensions) = &pspec.spec.remote_extensions {
912 : // First, create control files for all availale extensions
913 : extension_server::create_control_files(remote_extensions, &self.pgbin);
914 :
915 : let library_load_start_time = Utc::now();
916 : let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
917 :
918 : let library_load_time = Utc::now()
919 : .signed_duration_since(library_load_start_time)
920 : .to_std()
921 : .unwrap()
922 : .as_millis() as u64;
923 : let mut state = self.state.lock().unwrap();
924 : state.metrics.load_ext_ms = library_load_time;
925 : state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded;
926 : state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size;
927 : state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size;
928 1 : info!(
929 1 : "Loading shared_preload_libraries took {:?}ms",
930 1 : library_load_time
931 1 : );
932 1 : info!("{:?}", remote_ext_metrics);
933 : }
934 :
935 : self.prepare_pgdata(&compute_state, extension_server_port)?;
936 :
937 : let start_time = Utc::now();
938 : let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
939 :
940 : let config_time = Utc::now();
941 : if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
942 : let pgdata_path = Path::new(&self.pgdata);
943 : // temporarily reset max_cluster_size in config
944 : // to avoid the possibility of hitting the limit, while we are applying config:
945 : // creating new extensions, roles, etc...
946 : config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
947 : self.pg_reload_conf()?;
948 :
949 : self.apply_config(&compute_state)?;
950 :
951 : config::compute_ctl_temp_override_remove(pgdata_path)?;
952 : self.pg_reload_conf()?;
953 : }
954 :
955 : let startup_end_time = Utc::now();
956 : {
957 : let mut state = self.state.lock().unwrap();
958 : state.metrics.start_postgres_ms = config_time
959 : .signed_duration_since(start_time)
960 : .to_std()
961 : .unwrap()
962 : .as_millis() as u64;
963 : state.metrics.config_ms = startup_end_time
964 : .signed_duration_since(config_time)
965 : .to_std()
966 : .unwrap()
967 : .as_millis() as u64;
968 : state.metrics.total_startup_ms = startup_end_time
969 : .signed_duration_since(compute_state.start_time)
970 : .to_std()
971 : .unwrap()
972 : .as_millis() as u64;
973 : }
974 : self.set_status(ComputeStatus::Running);
975 :
976 575 : info!(
977 575 : "finished configuration of compute for project {}",
978 575 : pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
979 575 : );
980 :
981 : // Log metrics so that we can search for slow operations in logs
982 : let metrics = {
983 : let state = self.state.lock().unwrap();
984 : state.metrics.clone()
985 : };
986 575 : info!(?metrics, "compute start finished");
987 :
988 : Ok(pg_process)
989 : }
990 :
991 : /// Update the `last_active` in the shared state, but ensure that it's a more recent one.
992 12194 : pub fn update_last_active(&self, last_active: Option<DateTime<Utc>>) {
993 12194 : let mut state = self.state.lock().unwrap();
994 12194 : // NB: `Some(<DateTime>)` is always greater than `None`.
995 12194 : if last_active > state.last_active {
996 280 : state.last_active = last_active;
997 280 : debug!("set the last compute activity time to: {:?}", last_active);
998 11914 : }
999 12194 : }
1000 :
1001 : // Look for core dumps and collect backtraces.
1002 : //
1003 : // EKS worker nodes have following core dump settings:
1004 : // /proc/sys/kernel/core_pattern -> core
1005 : // /proc/sys/kernel/core_uses_pid -> 1
1006 : // ulimint -c -> unlimited
1007 : // which results in core dumps being written to postgres data directory as core.<pid>.
1008 : //
1009 : // Use that as a default location and pattern, except macos where core dumps are written
1010 : // to /cores/ directory by default.
1011 574 : pub fn check_for_core_dumps(&self) -> Result<()> {
1012 574 : let core_dump_dir = match std::env::consts::OS {
1013 574 : "macos" => Path::new("/cores/"),
1014 574 : _ => Path::new(&self.pgdata),
1015 : };
1016 :
1017 : // Collect core dump paths if any
1018 574 : info!("checking for core dumps in {}", core_dump_dir.display());
1019 574 : let files = fs::read_dir(core_dump_dir)?;
1020 13830 : let cores = files.filter_map(|entry| {
1021 13830 : let entry = entry.ok()?;
1022 13830 : let _ = entry.file_name().to_str()?.strip_prefix("core.")?;
1023 0 : Some(entry.path())
1024 13830 : });
1025 :
1026 : // Print backtrace for each core dump
1027 574 : for core_path in cores {
1028 0 : warn!(
1029 0 : "core dump found: {}, collecting backtrace",
1030 0 : core_path.display()
1031 0 : );
1032 :
1033 : // Try first with gdb
1034 0 : let backtrace = Command::new("gdb")
1035 0 : .args(["--batch", "-q", "-ex", "bt", &self.pgbin])
1036 0 : .arg(&core_path)
1037 0 : .output();
1038 :
1039 : // Try lldb if no gdb is found -- that is handy for local testing on macOS
1040 0 : let backtrace = match backtrace {
1041 0 : Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {
1042 0 : warn!("cannot find gdb, trying lldb");
1043 0 : Command::new("lldb")
1044 0 : .arg("-c")
1045 0 : .arg(&core_path)
1046 0 : .args(["--batch", "-o", "bt all", "-o", "quit"])
1047 0 : .output()
1048 : }
1049 0 : _ => backtrace,
1050 0 : }?;
1051 :
1052 0 : warn!(
1053 0 : "core dump backtrace: {}",
1054 0 : String::from_utf8_lossy(&backtrace.stdout)
1055 0 : );
1056 0 : warn!(
1057 0 : "debugger stderr: {}",
1058 0 : String::from_utf8_lossy(&backtrace.stderr)
1059 0 : );
1060 : }
1061 :
1062 574 : Ok(())
1063 574 : }
1064 :
1065 : /// Select `pg_stat_statements` data and return it as a stringified JSON
1066 0 : pub async fn collect_insights(&self) -> String {
1067 0 : let mut result_rows: Vec<String> = Vec::new();
1068 0 : let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await;
1069 0 : let (client, connection) = connect_result.unwrap();
1070 0 : tokio::spawn(async move {
1071 0 : if let Err(e) = connection.await {
1072 0 : eprintln!("connection error: {}", e);
1073 0 : }
1074 0 : });
1075 0 : let result = client
1076 0 : .simple_query(
1077 0 : "SELECT
1078 0 : row_to_json(pg_stat_statements)
1079 0 : FROM
1080 0 : pg_stat_statements
1081 0 : WHERE
1082 0 : userid != 'cloud_admin'::regrole::oid
1083 0 : ORDER BY
1084 0 : (mean_exec_time + mean_plan_time) DESC
1085 0 : LIMIT 100",
1086 0 : )
1087 0 : .await;
1088 :
1089 0 : if let Ok(raw_rows) = result {
1090 0 : for message in raw_rows.iter() {
1091 0 : if let postgres::SimpleQueryMessage::Row(row) = message {
1092 0 : if let Some(json) = row.get(0) {
1093 0 : result_rows.push(json.to_string());
1094 0 : }
1095 0 : }
1096 : }
1097 :
1098 0 : format!("{{\"pg_stat_statements\": [{}]}}", result_rows.join(","))
1099 : } else {
1100 0 : "{{\"pg_stat_statements\": []}}".to_string()
1101 : }
1102 0 : }
1103 :
1104 : // download an archive, unzip and place files in correct locations
1105 2 : pub async fn download_extension(
1106 2 : &self,
1107 2 : real_ext_name: String,
1108 2 : ext_path: RemotePath,
1109 2 : ) -> Result<u64, DownloadError> {
1110 2 : let ext_remote_storage =
1111 2 : self.ext_remote_storage
1112 2 : .as_ref()
1113 2 : .ok_or(DownloadError::BadInput(anyhow::anyhow!(
1114 2 : "Remote extensions storage is not configured",
1115 2 : )))?;
1116 :
1117 2 : let ext_archive_name = ext_path.object_name().expect("bad path");
1118 2 :
1119 2 : let mut first_try = false;
1120 2 : if !self
1121 2 : .ext_download_progress
1122 2 : .read()
1123 2 : .expect("lock err")
1124 2 : .contains_key(ext_archive_name)
1125 1 : {
1126 1 : self.ext_download_progress
1127 1 : .write()
1128 1 : .expect("lock err")
1129 1 : .insert(ext_archive_name.to_string(), (Utc::now(), false));
1130 1 : first_try = true;
1131 1 : }
1132 2 : let (download_start, download_completed) =
1133 2 : self.ext_download_progress.read().expect("lock err")[ext_archive_name];
1134 2 : let start_time_delta = Utc::now()
1135 2 : .signed_duration_since(download_start)
1136 2 : .to_std()
1137 2 : .unwrap()
1138 2 : .as_millis() as u64;
1139 2 :
1140 2 : // how long to wait for extension download if it was started by another process
1141 2 : const HANG_TIMEOUT: u64 = 3000; // milliseconds
1142 2 :
1143 2 : if download_completed {
1144 1 : info!("extension already downloaded, skipping re-download");
1145 1 : return Ok(0);
1146 1 : } else if start_time_delta < HANG_TIMEOUT && !first_try {
1147 0 : info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout");
1148 0 : let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
1149 : loop {
1150 0 : info!("waiting for download");
1151 0 : interval.tick().await;
1152 0 : let (_, download_completed_now) =
1153 0 : self.ext_download_progress.read().expect("lock")[ext_archive_name];
1154 0 : if download_completed_now {
1155 0 : info!("download finished by whoever else downloaded it");
1156 0 : return Ok(0);
1157 0 : }
1158 : }
1159 : // NOTE: the above loop will get terminated
1160 : // based on the timeout of the download function
1161 1 : }
1162 :
1163 : // if extension hasn't been downloaded before or the previous
1164 : // attempt to download was at least HANG_TIMEOUT ms ago
1165 : // then we try to download it here
1166 1 : info!("downloading new extension {ext_archive_name}");
1167 :
1168 1 : let download_size = extension_server::download_extension(
1169 1 : &real_ext_name,
1170 1 : &ext_path,
1171 1 : ext_remote_storage,
1172 1 : &self.pgbin,
1173 1 : )
1174 10 : .await
1175 1 : .map_err(DownloadError::Other);
1176 1 :
1177 1 : self.ext_download_progress
1178 1 : .write()
1179 1 : .expect("bad lock")
1180 1 : .insert(ext_archive_name.to_string(), (download_start, true));
1181 1 :
1182 1 : download_size
1183 2 : }
1184 :
1185 : #[tokio::main]
1186 1 : pub async fn prepare_preload_libraries(
1187 1 : &self,
1188 1 : spec: &ComputeSpec,
1189 1 : ) -> Result<RemoteExtensionMetrics> {
1190 1 : if self.ext_remote_storage.is_none() {
1191 0 : return Ok(RemoteExtensionMetrics {
1192 0 : num_ext_downloaded: 0,
1193 0 : largest_ext_size: 0,
1194 0 : total_ext_download_size: 0,
1195 0 : });
1196 1 : }
1197 1 : let remote_extensions = spec
1198 1 : .remote_extensions
1199 1 : .as_ref()
1200 1 : .ok_or(anyhow::anyhow!("Remote extensions are not configured"))?;
1201 :
1202 1 : info!("parse shared_preload_libraries from spec.cluster.settings");
1203 1 : let mut libs_vec = Vec::new();
1204 1 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
1205 0 : libs_vec = libs
1206 0 : .split(&[',', '\'', ' '])
1207 0 : .filter(|s| *s != "neon" && !s.is_empty())
1208 0 : .map(str::to_string)
1209 0 : .collect();
1210 1 : }
1211 1 : info!("parse shared_preload_libraries from provided postgresql.conf");
1212 :
1213 : // that is used in neon_local and python tests
1214 1 : if let Some(conf) = &spec.cluster.postgresql_conf {
1215 1 : let conf_lines = conf.split('\n').collect::<Vec<&str>>();
1216 1 : let mut shared_preload_libraries_line = "";
1217 23 : for line in conf_lines {
1218 22 : if line.starts_with("shared_preload_libraries") {
1219 1 : shared_preload_libraries_line = line;
1220 21 : }
1221 : }
1222 1 : let mut preload_libs_vec = Vec::new();
1223 1 : if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
1224 0 : preload_libs_vec = libs
1225 0 : .split(&[',', '\'', ' '])
1226 0 : .filter(|s| *s != "neon" && !s.is_empty())
1227 0 : .map(str::to_string)
1228 0 : .collect();
1229 1 : }
1230 1 : libs_vec.extend(preload_libs_vec);
1231 0 : }
1232 :
1233 : // Don't try to download libraries that are not in the index.
1234 : // Assume that they are already present locally.
1235 1 : libs_vec.retain(|lib| remote_extensions.library_index.contains_key(lib));
1236 1 :
1237 1 : info!("Downloading to shared preload libraries: {:?}", &libs_vec);
1238 :
1239 1 : let mut download_tasks = Vec::new();
1240 1 : for library in &libs_vec {
1241 0 : let (ext_name, ext_path) =
1242 0 : remote_extensions.get_ext(library, true, &self.build_tag, &self.pgversion)?;
1243 0 : download_tasks.push(self.download_extension(ext_name, ext_path));
1244 : }
1245 1 : let results = join_all(download_tasks).await;
1246 :
1247 1 : let mut remote_ext_metrics = RemoteExtensionMetrics {
1248 1 : num_ext_downloaded: 0,
1249 1 : largest_ext_size: 0,
1250 1 : total_ext_download_size: 0,
1251 1 : };
1252 1 : for result in results {
1253 0 : let download_size = match result {
1254 0 : Ok(res) => {
1255 0 : remote_ext_metrics.num_ext_downloaded += 1;
1256 0 : res
1257 : }
1258 0 : Err(err) => {
1259 0 : // if we failed to download an extension, we don't want to fail the whole
1260 0 : // process, but we do want to log the error
1261 0 : error!("Failed to download extension: {}", err);
1262 0 : 0
1263 : }
1264 : };
1265 :
1266 0 : remote_ext_metrics.largest_ext_size =
1267 0 : std::cmp::max(remote_ext_metrics.largest_ext_size, download_size);
1268 0 : remote_ext_metrics.total_ext_download_size += download_size;
1269 : }
1270 1 : Ok(remote_ext_metrics)
1271 : }
1272 : }
|