TLA Line data Source code
1 : use std::collections::HashMap;
2 : use std::env;
3 : use std::fs;
4 : use std::io::BufRead;
5 : use std::os::unix::fs::PermissionsExt;
6 : use std::path::Path;
7 : use std::process::{Command, Stdio};
8 : use std::str::FromStr;
9 : use std::sync::atomic::AtomicU32;
10 : use std::sync::atomic::Ordering;
11 : use std::sync::{Condvar, Mutex, RwLock};
12 : use std::thread;
13 : use std::time::Instant;
14 :
15 : use anyhow::{Context, Result};
16 : use chrono::{DateTime, Utc};
17 : use futures::future::join_all;
18 : use futures::stream::FuturesUnordered;
19 : use futures::StreamExt;
20 : use postgres::{Client, NoTls};
21 : use tokio;
22 : use tokio_postgres;
23 : use tracing::{error, info, instrument, warn};
24 : use utils::id::{TenantId, TimelineId};
25 : use utils::lsn::Lsn;
26 :
27 : use compute_api::responses::{ComputeMetrics, ComputeStatus};
28 : use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec};
29 : use utils::measured_stream::MeasuredReader;
30 :
31 : use remote_storage::{DownloadError, RemotePath};
32 :
33 : use crate::checker::create_availability_check_data;
34 : use crate::pg_helpers::*;
35 : use crate::spec::*;
36 : use crate::sync_sk::{check_if_synced, ping_safekeeper};
37 : use crate::{config, extension_server};
38 :
39 : pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
40 : pub static PG_PID: AtomicU32 = AtomicU32::new(0);
41 :
42 : /// Compute node info shared across several `compute_ctl` threads.
43 : pub struct ComputeNode {
44 : // Url type maintains proper escaping
45 : pub connstr: url::Url,
46 : pub pgdata: String,
47 : pub pgbin: String,
48 : pub pgversion: String,
49 : /// We should only allow live re- / configuration of the compute node if
50 : /// it uses 'pull model', i.e. it can go to control-plane and fetch
51 : /// the latest configuration. Otherwise, there could be a case:
52 : /// - we start compute with some spec provided as argument
53 : /// - we push new spec and it does reconfiguration
54 : /// - but then something happens and compute pod / VM is destroyed,
55 : /// so k8s controller starts it again with the **old** spec
56 : /// and the same for empty computes:
57 : /// - we started compute without any spec
58 : /// - we push spec and it does configuration
59 : /// - but then it is restarted without any spec again
60 : pub live_config_allowed: bool,
61 : /// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
62 : /// To allow HTTP API server to serving status requests, while configuration
63 : /// is in progress, lock should be held only for short periods of time to do
64 : /// read/write, not the whole configuration process.
65 : pub state: Mutex<ComputeState>,
66 : /// `Condvar` to allow notifying waiters about state changes.
67 : pub state_changed: Condvar,
68 : /// the address of extension storage proxy gateway
69 : pub ext_remote_storage: Option<String>,
70 : // key: ext_archive_name, value: started download time, download_completed?
71 : pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
72 : pub build_tag: String,
73 : // connection string to pgbouncer to change settings
74 : pub pgbouncer_connstr: Option<String>,
75 : // path to pgbouncer.ini to change settings
76 : pub pgbouncer_ini_path: Option<String>,
77 : }
78 :
79 : // store some metrics about download size that might impact startup time
80 CBC 2 : #[derive(Clone, Debug)]
81 : pub struct RemoteExtensionMetrics {
82 : num_ext_downloaded: u64,
83 : largest_ext_size: u64,
84 : total_ext_download_size: u64,
85 : }
86 :
87 1305 : #[derive(Clone, Debug)]
88 : pub struct ComputeState {
89 : pub start_time: DateTime<Utc>,
90 : pub status: ComputeStatus,
91 : /// Timestamp of the last Postgres activity. It could be `None` if
92 : /// compute wasn't used since start.
93 : pub last_active: Option<DateTime<Utc>>,
94 : pub error: Option<String>,
95 : pub pspec: Option<ParsedSpec>,
96 : pub metrics: ComputeMetrics,
97 : }
98 :
99 : impl ComputeState {
100 544 : pub fn new() -> Self {
101 544 : Self {
102 544 : start_time: Utc::now(),
103 544 : status: ComputeStatus::Empty,
104 544 : last_active: None,
105 544 : error: None,
106 544 : pspec: None,
107 544 : metrics: ComputeMetrics::default(),
108 544 : }
109 544 : }
110 : }
111 :
112 : impl Default for ComputeState {
113 UBC 0 : fn default() -> Self {
114 0 : Self::new()
115 0 : }
116 : }
117 :
118 CBC 2016 : #[derive(Clone, Debug)]
119 : pub struct ParsedSpec {
120 : pub spec: ComputeSpec,
121 : pub tenant_id: TenantId,
122 : pub timeline_id: TimelineId,
123 : pub pageserver_connstr: String,
124 : pub safekeeper_connstrings: Vec<String>,
125 : pub storage_auth_token: Option<String>,
126 : }
127 :
128 : impl TryFrom<ComputeSpec> for ParsedSpec {
129 : type Error = String;
130 761 : fn try_from(spec: ComputeSpec) -> Result<Self, String> {
131 : // Extract the options from the spec file that are needed to connect to
132 : // the storage system.
133 : //
134 : // For backwards-compatibility, the top-level fields in the spec file
135 : // may be empty. In that case, we need to dig them from the GUCs in the
136 : // cluster.settings field.
137 761 : let pageserver_connstr = spec
138 761 : .pageserver_connstring
139 761 : .clone()
140 761 : .or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
141 761 : .ok_or("pageserver connstr should be provided")?;
142 761 : let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
143 50 : if matches!(spec.mode, ComputeMode::Primary) {
144 UBC 0 : spec.cluster
145 0 : .settings
146 0 : .find("neon.safekeepers")
147 0 : .ok_or("safekeeper connstrings should be provided")?
148 0 : .split(',')
149 0 : .map(|str| str.to_string())
150 0 : .collect()
151 : } else {
152 CBC 50 : vec![]
153 : }
154 : } else {
155 711 : spec.safekeeper_connstrings.clone()
156 : };
157 761 : let storage_auth_token = spec.storage_auth_token.clone();
158 761 : let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
159 761 : tenant_id
160 : } else {
161 UBC 0 : spec.cluster
162 0 : .settings
163 0 : .find("neon.tenant_id")
164 0 : .ok_or("tenant id should be provided")
165 0 : .map(|s| TenantId::from_str(&s))?
166 0 : .or(Err("invalid tenant id"))?
167 : };
168 CBC 761 : let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
169 761 : timeline_id
170 : } else {
171 UBC 0 : spec.cluster
172 0 : .settings
173 0 : .find("neon.timeline_id")
174 0 : .ok_or("timeline id should be provided")
175 0 : .map(|s| TimelineId::from_str(&s))?
176 0 : .or(Err("invalid timeline id"))?
177 : };
178 :
179 CBC 761 : Ok(ParsedSpec {
180 761 : spec,
181 761 : pageserver_connstr,
182 761 : safekeeper_connstrings,
183 761 : storage_auth_token,
184 761 : tenant_id,
185 761 : timeline_id,
186 761 : })
187 761 : }
188 : }
189 :
190 : /// If we are a VM, returns a [`Command`] that will run in the `neon-postgres`
191 : /// cgroup. Otherwise returns the default `Command::new(cmd)`
192 : ///
193 : /// This function should be used to start postgres, as it will start it in the
194 : /// neon-postgres cgroup if we are a VM. This allows autoscaling to control
195 : /// postgres' resource usage. The cgroup will exist in VMs because vm-builder
196 : /// creates it during the sysinit phase of its inittab.
197 1411 : fn maybe_cgexec(cmd: &str) -> Command {
198 1411 : // The cplane sets this env var for autoscaling computes.
199 1411 : // use `var_os` so we don't have to worry about the variable being valid
200 1411 : // unicode. Should never be an concern . . . but just in case
201 1411 : if env::var_os("AUTOSCALING").is_some() {
202 UBC 0 : let mut command = Command::new("cgexec");
203 0 : command.args(["-g", "memory:neon-postgres"]);
204 0 : command.arg(cmd);
205 0 : command
206 : } else {
207 CBC 1411 : Command::new(cmd)
208 : }
209 1411 : }
210 :
211 : /// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
212 : /// that we give to customers
213 4 : fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
214 4 : let roles = spec
215 4 : .cluster
216 4 : .roles
217 4 : .iter()
218 4 : .map(|r| escape_literal(&r.name))
219 4 : .collect::<Vec<_>>();
220 4 :
221 4 : let dbs = spec
222 4 : .cluster
223 4 : .databases
224 4 : .iter()
225 4 : .map(|db| escape_literal(&db.name))
226 4 : .collect::<Vec<_>>();
227 :
228 4 : let roles_decl = if roles.is_empty() {
229 4 : String::from("roles text[] := NULL;")
230 : } else {
231 UBC 0 : format!(
232 0 : r#"
233 0 : roles text[] := ARRAY(SELECT rolname
234 0 : FROM pg_catalog.pg_roles
235 0 : WHERE rolname IN ({}));"#,
236 0 : roles.join(", ")
237 0 : )
238 : };
239 :
240 CBC 4 : let database_decl = if dbs.is_empty() {
241 4 : String::from("dbs text[] := NULL;")
242 : } else {
243 UBC 0 : format!(
244 0 : r#"
245 0 : dbs text[] := ARRAY(SELECT datname
246 0 : FROM pg_catalog.pg_database
247 0 : WHERE datname IN ({}));"#,
248 0 : dbs.join(", ")
249 0 : )
250 : };
251 :
252 : // ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases
253 : // (see https://www.postgresql.org/docs/current/ddl-priv.html)
254 CBC 4 : let query = format!(
255 4 : r#"
256 4 : DO $$
257 4 : DECLARE
258 4 : r text;
259 4 : {}
260 4 : {}
261 4 : BEGIN
262 4 : IF NOT EXISTS (
263 4 : SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
264 4 : THEN
265 4 : CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
266 4 : IF array_length(roles, 1) IS NOT NULL THEN
267 4 : EXECUTE format('GRANT neon_superuser TO %s',
268 4 : array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
269 4 : FOREACH r IN ARRAY roles LOOP
270 4 : EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r));
271 4 : END LOOP;
272 4 : END IF;
273 4 : IF array_length(dbs, 1) IS NOT NULL THEN
274 4 : EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser',
275 4 : array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', '));
276 4 : END IF;
277 4 : END IF;
278 4 : END
279 4 : $$;"#,
280 4 : roles_decl, database_decl,
281 4 : );
282 4 : info!("Neon superuser created:\n{}", &query);
283 4 : client
284 4 : .simple_query(&query)
285 4 : .map_err(|e| anyhow::anyhow!(e).context(query))?;
286 4 : Ok(())
287 4 : }
288 :
289 : impl ComputeNode {
290 : /// Check that compute node has corresponding feature enabled.
291 UBC 0 : pub fn has_feature(&self, feature: ComputeFeature) -> bool {
292 0 : let state = self.state.lock().unwrap();
293 :
294 0 : if let Some(s) = state.pspec.as_ref() {
295 0 : s.spec.features.contains(&feature)
296 : } else {
297 0 : false
298 : }
299 0 : }
300 :
301 CBC 754 : pub fn set_status(&self, status: ComputeStatus) {
302 754 : let mut state = self.state.lock().unwrap();
303 754 : state.status = status;
304 754 : self.state_changed.notify_all();
305 754 : }
306 :
307 UBC 0 : pub fn get_status(&self) -> ComputeStatus {
308 0 : self.state.lock().unwrap().status
309 0 : }
310 :
311 : // Remove `pgdata` directory and create it again with right permissions.
312 CBC 544 : fn create_pgdata(&self) -> Result<()> {
313 544 : // Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
314 544 : // If it is something different then create_dir() will error out anyway.
315 544 : let _ok = fs::remove_dir_all(&self.pgdata);
316 544 : fs::create_dir(&self.pgdata)?;
317 544 : fs::set_permissions(&self.pgdata, fs::Permissions::from_mode(0o700))?;
318 :
319 544 : Ok(())
320 544 : }
321 :
322 : // Get basebackup from the libpq connection to pageserver using `connstr` and
323 : // unarchive it to `pgdata` directory overriding all its previous content.
324 544 : #[instrument(skip_all, fields(%lsn))]
325 : fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
326 : let spec = compute_state.pspec.as_ref().expect("spec must be set");
327 : let start_time = Instant::now();
328 :
329 : let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
330 :
331 : // Use the storage auth token from the config file, if given.
332 : // Note: this overrides any password set in the connection string.
333 : if let Some(storage_auth_token) = &spec.storage_auth_token {
334 15 : info!("Got storage auth token from spec file");
335 : config.password(storage_auth_token);
336 : } else {
337 529 : info!("Storage auth token not set");
338 : }
339 :
340 : // Connect to pageserver
341 : let mut client = config.connect(NoTls)?;
342 : let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
343 :
344 : let basebackup_cmd = match lsn {
345 : // HACK We don't use compression on first start (Lsn(0)) because there's no API for it
346 : Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id),
347 : _ => format!(
348 : "basebackup {} {} {} --gzip",
349 : spec.tenant_id, spec.timeline_id, lsn
350 : ),
351 : };
352 :
353 : let copyreader = client.copy_out(basebackup_cmd.as_str())?;
354 : let mut measured_reader = MeasuredReader::new(copyreader);
355 :
356 : // Check the magic number to see if it's a gzip or not. Even though
357 : // we might explicitly ask for gzip, an old pageserver with no implementation
358 : // of gzip compression might send us uncompressed data. After some time
359 : // passes we can assume all pageservers know how to compress and we can
360 : // delete this check.
361 : //
362 : // If the data is not gzip, it will be tar. It will not be mistakenly
363 : // recognized as gzip because tar starts with an ascii encoding of a filename,
364 : // and 0x1f and 0x8b are unlikely first characters for any filename. Moreover,
365 : // we send the "global" directory first from the pageserver, so it definitely
366 : // won't be recognized as gzip.
367 : let mut bufreader = std::io::BufReader::new(&mut measured_reader);
368 : let gzip = {
369 : let peek = bufreader.fill_buf().unwrap();
370 : peek[0] == 0x1f && peek[1] == 0x8b
371 : };
372 :
373 : // Read the archive directly from the `CopyOutReader`
374 : //
375 : // Set `ignore_zeros` so that unpack() reads all the Copy data and
376 : // doesn't stop at the end-of-archive marker. Otherwise, if the server
377 : // sends an Error after finishing the tarball, we will not notice it.
378 : if gzip {
379 : let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut bufreader));
380 : ar.set_ignore_zeros(true);
381 : ar.unpack(&self.pgdata)?;
382 : } else {
383 : let mut ar = tar::Archive::new(&mut bufreader);
384 : ar.set_ignore_zeros(true);
385 : ar.unpack(&self.pgdata)?;
386 : };
387 :
388 : // Report metrics
389 : let mut state = self.state.lock().unwrap();
390 : state.metrics.pageserver_connect_micros = pageserver_connect_micros;
391 : state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
392 : state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
393 : Ok(())
394 : }
395 :
396 494 : pub async fn check_safekeepers_synced_async(
397 494 : &self,
398 494 : compute_state: &ComputeState,
399 494 : ) -> Result<Option<Lsn>> {
400 494 : // Construct a connection config for each safekeeper
401 494 : let pspec: ParsedSpec = compute_state
402 494 : .pspec
403 494 : .as_ref()
404 494 : .expect("spec must be set")
405 494 : .clone();
406 494 : let sk_connstrs: Vec<String> = pspec.safekeeper_connstrings.clone();
407 635 : let sk_configs = sk_connstrs.into_iter().map(|connstr| {
408 635 : // Format connstr
409 635 : let id = connstr.clone();
410 635 : let connstr = format!("postgresql://no_user@{}", connstr);
411 635 : let options = format!(
412 635 : "-c timeline_id={} tenant_id={}",
413 635 : pspec.timeline_id, pspec.tenant_id
414 635 : );
415 635 :
416 635 : // Construct client
417 635 : let mut config = tokio_postgres::Config::from_str(&connstr).unwrap();
418 635 : config.options(&options);
419 635 : if let Some(storage_auth_token) = pspec.storage_auth_token.clone() {
420 26 : config.password(storage_auth_token);
421 609 : }
422 :
423 635 : (id, config)
424 635 : });
425 494 :
426 494 : // Create task set to query all safekeepers
427 494 : let mut tasks = FuturesUnordered::new();
428 494 : let quorum = sk_configs.len() / 2 + 1;
429 1129 : for (id, config) in sk_configs {
430 635 : let timeout = tokio::time::Duration::from_millis(100);
431 635 : let task = tokio::time::timeout(timeout, ping_safekeeper(id, config));
432 635 : tasks.push(tokio::spawn(task));
433 635 : }
434 :
435 : // Get a quorum of responses or errors
436 494 : let mut responses = Vec::new();
437 494 : let mut join_errors = Vec::new();
438 494 : let mut task_errors = Vec::new();
439 494 : let mut timeout_errors = Vec::new();
440 1064 : while let Some(response) = tasks.next().await {
441 577 : match response {
442 566 : Ok(Ok(Ok(r))) => responses.push(r),
443 11 : Ok(Ok(Err(e))) => task_errors.push(e),
444 UBC 0 : Ok(Err(e)) => timeout_errors.push(e),
445 0 : Err(e) => join_errors.push(e),
446 : };
447 CBC 577 : if responses.len() >= quorum {
448 494 : break;
449 83 : }
450 83 : if join_errors.len() + task_errors.len() + timeout_errors.len() >= quorum {
451 UBC 0 : break;
452 CBC 83 : }
453 : }
454 :
455 : // In case of error, log and fail the check, but don't crash.
456 : // We're playing it safe because these errors could be transient
457 : // and we don't yet retry. Also being careful here allows us to
458 : // be backwards compatible with safekeepers that don't have the
459 : // TIMELINE_STATUS API yet.
460 494 : if responses.len() < quorum {
461 UBC 0 : error!(
462 0 : "failed sync safekeepers check {:?} {:?} {:?}",
463 0 : join_errors, task_errors, timeout_errors
464 0 : );
465 0 : return Ok(None);
466 CBC 494 : }
467 494 :
468 494 : Ok(check_if_synced(responses))
469 494 : }
470 :
471 : // Fast path for sync_safekeepers. If they're already synced we get the lsn
472 : // in one roundtrip. If not, we should do a full sync_safekeepers.
473 494 : pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
474 494 : let start_time = Utc::now();
475 494 :
476 494 : // Run actual work with new tokio runtime
477 494 : let rt = tokio::runtime::Builder::new_current_thread()
478 494 : .enable_all()
479 494 : .build()
480 494 : .expect("failed to create rt");
481 494 : let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
482 494 :
483 494 : // Record runtime
484 494 : self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
485 494 : .signed_duration_since(start_time)
486 494 : .to_std()
487 494 : .unwrap()
488 494 : .as_millis() as u64;
489 494 : result
490 494 : }
491 :
492 : // Run `postgres` in a special mode with `--sync-safekeepers` argument
493 : // and return the reported LSN back to the caller.
494 874 : #[instrument(skip_all)]
495 : pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
496 : let start_time = Utc::now();
497 :
498 : let sync_handle = maybe_cgexec(&self.pgbin)
499 : .args(["--sync-safekeepers"])
500 : .env("PGDATA", &self.pgdata) // we cannot use -D in this mode
501 : .envs(if let Some(storage_auth_token) = &storage_auth_token {
502 : vec![("NEON_AUTH_TOKEN", storage_auth_token)]
503 : } else {
504 : vec![]
505 : })
506 : .stdout(Stdio::piped())
507 : .spawn()
508 : .expect("postgres --sync-safekeepers failed to start");
509 : SYNC_SAFEKEEPERS_PID.store(sync_handle.id(), Ordering::SeqCst);
510 :
511 : // `postgres --sync-safekeepers` will print all log output to stderr and
512 : // final LSN to stdout. So we pipe only stdout, while stderr will be automatically
513 : // redirected to the caller output.
514 : let sync_output = sync_handle
515 : .wait_with_output()
516 : .expect("postgres --sync-safekeepers failed");
517 : SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst);
518 :
519 : if !sync_output.status.success() {
520 : anyhow::bail!(
521 : "postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
522 : sync_output.status,
523 : String::from_utf8(sync_output.stdout)
524 : .expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
525 : );
526 : }
527 :
528 : self.state.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now()
529 : .signed_duration_since(start_time)
530 : .to_std()
531 : .unwrap()
532 : .as_millis() as u64;
533 :
534 : let lsn = Lsn::from_str(String::from_utf8(sync_output.stdout)?.trim())?;
535 :
536 : Ok(lsn)
537 : }
538 :
539 : /// Do all the preparations like PGDATA directory creation, configuration,
540 : /// safekeepers sync, basebackup, etc.
541 544 : #[instrument(skip_all)]
542 : pub fn prepare_pgdata(
543 : &self,
544 : compute_state: &ComputeState,
545 : extension_server_port: u16,
546 : ) -> Result<()> {
547 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
548 : let spec = &pspec.spec;
549 : let pgdata_path = Path::new(&self.pgdata);
550 :
551 : // Remove/create an empty pgdata directory and put configuration there.
552 : self.create_pgdata()?;
553 : config::write_postgres_conf(
554 : &pgdata_path.join("postgresql.conf"),
555 : &pspec.spec,
556 : Some(extension_server_port),
557 : )?;
558 :
559 : // Syncing safekeepers is only safe with primary nodes: if a primary
560 : // is already connected it will be kicked out, so a secondary (standby)
561 : // cannot sync safekeepers.
562 : let lsn = match spec.mode {
563 : ComputeMode::Primary => {
564 494 : info!("checking if safekeepers are synced");
565 : let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
566 : lsn
567 : } else {
568 380 : info!("starting safekeepers syncing");
569 : self.sync_safekeepers(pspec.storage_auth_token.clone())
570 UBC 0 : .with_context(|| "failed to sync safekeepers")?
571 : };
572 CBC 494 : info!("safekeepers synced at LSN {}", lsn);
573 : lsn
574 : }
575 : ComputeMode::Static(lsn) => {
576 48 : info!("Starting read-only node at static LSN {}", lsn);
577 : lsn
578 : }
579 : ComputeMode::Replica => {
580 2 : info!("Initializing standby from latest Pageserver LSN");
581 : Lsn(0)
582 : }
583 : };
584 :
585 544 : info!(
586 544 : "getting basebackup@{} from pageserver {}",
587 544 : lsn, &pspec.pageserver_connstr
588 544 : );
589 7 : self.get_basebackup(compute_state, lsn).with_context(|| {
590 7 : format!(
591 7 : "failed to get basebackup@{} from pageserver {}",
592 7 : lsn, &pspec.pageserver_connstr
593 7 : )
594 7 : })?;
595 :
596 : // Update pg_hba.conf received with basebackup.
597 : update_pg_hba(pgdata_path)?;
598 :
599 : match spec.mode {
600 : ComputeMode::Primary => {}
601 : ComputeMode::Replica | ComputeMode::Static(..) => {
602 : add_standby_signal(pgdata_path)?;
603 : }
604 : }
605 :
606 : Ok(())
607 : }
608 :
609 : /// Start and stop a postgres process to warm up the VM for startup.
610 UBC 0 : pub fn prewarm_postgres(&self) -> Result<()> {
611 0 : info!("prewarming");
612 :
613 : // Create pgdata
614 0 : let pgdata = &format!("{}.warmup", self.pgdata);
615 0 : create_pgdata(pgdata)?;
616 :
617 : // Run initdb to completion
618 0 : info!("running initdb");
619 0 : let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
620 0 : Command::new(initdb_bin)
621 0 : .args(["-D", pgdata])
622 0 : .output()
623 0 : .expect("cannot start initdb process");
624 0 :
625 0 : // Write conf
626 0 : use std::io::Write;
627 0 : let conf_path = Path::new(pgdata).join("postgresql.conf");
628 0 : let mut file = std::fs::File::create(conf_path)?;
629 0 : writeln!(file, "shared_buffers=65536")?;
630 0 : writeln!(file, "port=51055")?; // Nobody should be connecting
631 0 : writeln!(file, "shared_preload_libraries = 'neon'")?;
632 :
633 : // Start postgres
634 0 : info!("starting postgres");
635 0 : let mut pg = maybe_cgexec(&self.pgbin)
636 0 : .args(["-D", pgdata])
637 0 : .spawn()
638 0 : .expect("cannot start postgres process");
639 0 :
640 0 : // Stop it when it's ready
641 0 : info!("waiting for postgres");
642 0 : wait_for_postgres(&mut pg, Path::new(pgdata))?;
643 0 : pg.kill()?;
644 0 : info!("sent kill signal");
645 0 : pg.wait()?;
646 0 : info!("done prewarming");
647 :
648 : // clean up
649 0 : let _ok = fs::remove_dir_all(pgdata);
650 0 : Ok(())
651 0 : }
652 :
653 : /// Start Postgres as a child process and manage DBs/roles.
654 : /// After that this will hang waiting on the postmaster process to exit.
655 CBC 537 : #[instrument(skip_all)]
656 : pub fn start_postgres(
657 : &self,
658 : storage_auth_token: Option<String>,
659 : ) -> Result<std::process::Child> {
660 : let pgdata_path = Path::new(&self.pgdata);
661 :
662 : // Run postgres as a child process.
663 : let mut pg = maybe_cgexec(&self.pgbin)
664 : .args(["-D", &self.pgdata])
665 : .envs(if let Some(storage_auth_token) = &storage_auth_token {
666 : vec![("NEON_AUTH_TOKEN", storage_auth_token)]
667 : } else {
668 : vec![]
669 : })
670 : .spawn()
671 : .expect("cannot start postgres process");
672 : PG_PID.store(pg.id(), Ordering::SeqCst);
673 :
674 : wait_for_postgres(&mut pg, pgdata_path)?;
675 :
676 : Ok(pg)
677 : }
678 :
679 : /// Do initial configuration of the already started Postgres.
680 4 : #[instrument(skip_all)]
681 : pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
682 : // If connection fails,
683 : // it may be the old node with `zenith_admin` superuser.
684 : //
685 : // In this case we need to connect with old `zenith_admin` name
686 : // and create new user. We cannot simply rename connected user,
687 : // but we can create a new one and grant it all privileges.
688 : let mut client = match Client::connect(self.connstr.as_str(), NoTls) {
689 : Err(e) => {
690 UBC 0 : info!(
691 0 : "cannot connect to postgres: {}, retrying with `zenith_admin` username",
692 0 : e
693 0 : );
694 : let mut zenith_admin_connstr = self.connstr.clone();
695 :
696 : zenith_admin_connstr
697 : .set_username("zenith_admin")
698 0 : .map_err(|_| anyhow::anyhow!("invalid connstr"))?;
699 :
700 : let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
701 : // Disable forwarding so that users don't get a cloud_admin role
702 : client.simple_query("SET neon.forward_ddl = false")?;
703 : client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
704 : client.simple_query("GRANT zenith_admin TO cloud_admin")?;
705 : drop(client);
706 :
707 : // reconnect with connsting with expected name
708 : Client::connect(self.connstr.as_str(), NoTls)?
709 : }
710 : Ok(client) => client,
711 : };
712 :
713 : // Disable DDL forwarding because control plane already knows about these roles/databases.
714 : client.simple_query("SET neon.forward_ddl = false")?;
715 :
716 : // Proceed with post-startup configuration. Note, that order of operations is important.
717 : let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
718 : create_neon_superuser(spec, &mut client)?;
719 : cleanup_instance(&mut client)?;
720 : handle_roles(spec, &mut client)?;
721 : handle_databases(spec, &mut client)?;
722 : handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
723 : handle_grants(spec, &mut client, self.connstr.as_str())?;
724 : handle_extensions(spec, &mut client)?;
725 : handle_extension_neon(&mut client)?;
726 : create_availability_check_data(&mut client)?;
727 :
728 : // 'Close' connection
729 : drop(client);
730 :
731 : Ok(())
732 : }
733 :
734 : // We could've wrapped this around `pg_ctl reload`, but right now we don't use
735 : // `pg_ctl` for start / stop, so this just seems much easier to do as we already
736 : // have opened connection to Postgres and superuser access.
737 CBC 442 : #[instrument(skip_all)]
738 : fn pg_reload_conf(&self) -> Result<()> {
739 : let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
740 : Command::new(pgctl_bin)
741 : .args(["reload", "-D", &self.pgdata])
742 : .output()
743 : .expect("cannot run pg_ctl process");
744 : Ok(())
745 : }
746 :
747 : /// Similar to `apply_config()`, but does a bit different sequence of operations,
748 : /// as it's used to reconfigure a previously started and configured Postgres node.
749 217 : #[instrument(skip_all)]
750 : pub fn reconfigure(&self) -> Result<()> {
751 : let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
752 :
753 : if let Some(connstr) = &self.pgbouncer_connstr {
754 217 : info!("tuning pgbouncer with connstr: {:?}", connstr);
755 :
756 : let rt = tokio::runtime::Builder::new_current_thread()
757 : .enable_all()
758 : .build()
759 : .expect("failed to create rt");
760 :
761 : // Spawn a thread to do the tuning,
762 : // so that we don't block the main thread that starts Postgres.
763 : let pgbouncer_settings = spec.pgbouncer_settings.clone();
764 : let connstr_clone = connstr.clone();
765 : let pgbouncer_ini_path = self.pgbouncer_ini_path.clone();
766 217 : let _handle = thread::spawn(move || {
767 217 : let res = rt.block_on(tune_pgbouncer(
768 217 : pgbouncer_settings,
769 217 : &connstr_clone,
770 217 : pgbouncer_ini_path,
771 217 : ));
772 217 : if let Err(err) = res {
773 UBC 0 : error!("error while tuning pgbouncer: {err:?}");
774 CBC 217 : }
775 217 : });
776 : }
777 :
778 : // Write new config
779 : let pgdata_path = Path::new(&self.pgdata);
780 : let postgresql_conf_path = pgdata_path.join("postgresql.conf");
781 : config::write_postgres_conf(&postgresql_conf_path, &spec, None)?;
782 : // temporarily reset max_cluster_size in config
783 : // to avoid the possibility of hitting the limit, while we are reconfiguring:
784 : // creating new extensions, roles, etc...
785 : config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
786 : self.pg_reload_conf()?;
787 :
788 : let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
789 :
790 : // Proceed with post-startup configuration. Note, that order of operations is important.
791 : // Disable DDL forwarding because control plane already knows about these roles/databases.
792 : if spec.mode == ComputeMode::Primary {
793 : client.simple_query("SET neon.forward_ddl = false")?;
794 : cleanup_instance(&mut client)?;
795 : handle_roles(&spec, &mut client)?;
796 : handle_databases(&spec, &mut client)?;
797 : handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
798 : handle_grants(&spec, &mut client, self.connstr.as_str())?;
799 : handle_extensions(&spec, &mut client)?;
800 : handle_extension_neon(&mut client)?;
801 : }
802 :
803 : // 'Close' connection
804 : drop(client);
805 :
806 : // reset max_cluster_size in config back to original value and reload config
807 : config::compute_ctl_temp_override_remove(pgdata_path)?;
808 : self.pg_reload_conf()?;
809 :
810 : let unknown_op = "unknown".to_string();
811 : let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
812 217 : info!(
813 217 : "finished reconfiguration of compute node for operation {}",
814 217 : op_id
815 217 : );
816 :
817 : Ok(())
818 : }
819 :
820 544 : #[instrument(skip_all)]
821 : pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
822 : let compute_state = self.state.lock().unwrap().clone();
823 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
824 544 : info!(
825 544 : "starting compute for project {}, operation {}, tenant {}, timeline {}",
826 544 : pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
827 544 : pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
828 544 : pspec.tenant_id,
829 544 : pspec.timeline_id,
830 544 : );
831 :
832 : // tune pgbouncer
833 : if let Some(connstr) = &self.pgbouncer_connstr {
834 544 : info!("tuning pgbouncer with connstr: {:?}", connstr);
835 :
836 : let rt = tokio::runtime::Builder::new_current_thread()
837 : .enable_all()
838 : .build()
839 : .expect("failed to create rt");
840 :
841 : // Spawn a thread to do the tuning,
842 : // so that we don't block the main thread that starts Postgres.
843 : let pgbouncer_settings = pspec.spec.pgbouncer_settings.clone();
844 : let connstr_clone = connstr.clone();
845 : let pgbouncer_ini_path = self.pgbouncer_ini_path.clone();
846 544 : let _handle = thread::spawn(move || {
847 544 : let res = rt.block_on(tune_pgbouncer(
848 544 : pgbouncer_settings,
849 544 : &connstr_clone,
850 544 : pgbouncer_ini_path,
851 544 : ));
852 544 : if let Err(err) = res {
853 UBC 0 : error!("error while tuning pgbouncer: {err:?}");
854 CBC 544 : }
855 544 : });
856 : }
857 :
858 544 : info!(
859 544 : "start_compute spec.remote_extensions {:?}",
860 544 : pspec.spec.remote_extensions
861 544 : );
862 :
863 : // This part is sync, because we need to download
864 : // remote shared_preload_libraries before postgres start (if any)
865 : if let Some(remote_extensions) = &pspec.spec.remote_extensions {
866 : // First, create control files for all availale extensions
867 : extension_server::create_control_files(remote_extensions, &self.pgbin);
868 :
869 : let library_load_start_time = Utc::now();
870 : let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
871 :
872 : let library_load_time = Utc::now()
873 : .signed_duration_since(library_load_start_time)
874 : .to_std()
875 : .unwrap()
876 : .as_millis() as u64;
877 : let mut state = self.state.lock().unwrap();
878 : state.metrics.load_ext_ms = library_load_time;
879 : state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded;
880 : state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size;
881 : state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size;
882 1 : info!(
883 1 : "Loading shared_preload_libraries took {:?}ms",
884 1 : library_load_time
885 1 : );
886 1 : info!("{:?}", remote_ext_metrics);
887 : }
888 :
889 : self.prepare_pgdata(&compute_state, extension_server_port)?;
890 :
891 : let start_time = Utc::now();
892 : let pg = self.start_postgres(pspec.storage_auth_token.clone())?;
893 :
894 : let config_time = Utc::now();
895 : if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
896 : let pgdata_path = Path::new(&self.pgdata);
897 : // temporarily reset max_cluster_size in config
898 : // to avoid the possibility of hitting the limit, while we are applying config:
899 : // creating new extensions, roles, etc...
900 : config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
901 : self.pg_reload_conf()?;
902 :
903 : self.apply_config(&compute_state)?;
904 :
905 : config::compute_ctl_temp_override_remove(pgdata_path)?;
906 : self.pg_reload_conf()?;
907 : }
908 :
909 : let startup_end_time = Utc::now();
910 : {
911 : let mut state = self.state.lock().unwrap();
912 : state.metrics.start_postgres_ms = config_time
913 : .signed_duration_since(start_time)
914 : .to_std()
915 : .unwrap()
916 : .as_millis() as u64;
917 : state.metrics.config_ms = startup_end_time
918 : .signed_duration_since(config_time)
919 : .to_std()
920 : .unwrap()
921 : .as_millis() as u64;
922 : state.metrics.total_startup_ms = startup_end_time
923 : .signed_duration_since(compute_state.start_time)
924 : .to_std()
925 : .unwrap()
926 : .as_millis() as u64;
927 : }
928 : self.set_status(ComputeStatus::Running);
929 :
930 537 : info!(
931 537 : "finished configuration of compute for project {}",
932 537 : pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
933 537 : );
934 :
935 : // Log metrics so that we can search for slow operations in logs
936 : let metrics = {
937 : let state = self.state.lock().unwrap();
938 : state.metrics.clone()
939 : };
940 537 : info!(?metrics, "compute start finished");
941 :
942 : Ok(pg)
943 : }
944 :
945 : // Look for core dumps and collect backtraces.
946 : //
947 : // EKS worker nodes have following core dump settings:
948 : // /proc/sys/kernel/core_pattern -> core
949 : // /proc/sys/kernel/core_uses_pid -> 1
950 : // ulimint -c -> unlimited
951 : // which results in core dumps being written to postgres data directory as core.<pid>.
952 : //
953 : // Use that as a default location and pattern, except macos where core dumps are written
954 : // to /cores/ directory by default.
955 540 : pub fn check_for_core_dumps(&self) -> Result<()> {
956 540 : let core_dump_dir = match std::env::consts::OS {
957 540 : "macos" => Path::new("/cores/"),
958 540 : _ => Path::new(&self.pgdata),
959 : };
960 :
961 : // Collect core dump paths if any
962 540 : info!("checking for core dumps in {}", core_dump_dir.display());
963 540 : let files = fs::read_dir(core_dump_dir)?;
964 12863 : let cores = files.filter_map(|entry| {
965 12863 : let entry = entry.ok()?;
966 12863 : let _ = entry.file_name().to_str()?.strip_prefix("core.")?;
967 UBC 0 : Some(entry.path())
968 CBC 12863 : });
969 :
970 : // Print backtrace for each core dump
971 540 : for core_path in cores {
972 UBC 0 : warn!(
973 0 : "core dump found: {}, collecting backtrace",
974 0 : core_path.display()
975 0 : );
976 :
977 : // Try first with gdb
978 0 : let backtrace = Command::new("gdb")
979 0 : .args(["--batch", "-q", "-ex", "bt", &self.pgbin])
980 0 : .arg(&core_path)
981 0 : .output();
982 :
983 : // Try lldb if no gdb is found -- that is handy for local testing on macOS
984 0 : let backtrace = match backtrace {
985 0 : Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {
986 0 : warn!("cannot find gdb, trying lldb");
987 0 : Command::new("lldb")
988 0 : .arg("-c")
989 0 : .arg(&core_path)
990 0 : .args(["--batch", "-o", "bt all", "-o", "quit"])
991 0 : .output()
992 : }
993 0 : _ => backtrace,
994 0 : }?;
995 :
996 0 : warn!(
997 0 : "core dump backtrace: {}",
998 0 : String::from_utf8_lossy(&backtrace.stdout)
999 0 : );
1000 0 : warn!(
1001 0 : "debugger stderr: {}",
1002 0 : String::from_utf8_lossy(&backtrace.stderr)
1003 0 : );
1004 : }
1005 :
1006 CBC 540 : Ok(())
1007 540 : }
1008 :
1009 : /// Select `pg_stat_statements` data and return it as a stringified JSON
1010 UBC 0 : pub async fn collect_insights(&self) -> String {
1011 0 : let mut result_rows: Vec<String> = Vec::new();
1012 0 : let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await;
1013 0 : let (client, connection) = connect_result.unwrap();
1014 0 : tokio::spawn(async move {
1015 0 : if let Err(e) = connection.await {
1016 0 : eprintln!("connection error: {}", e);
1017 0 : }
1018 0 : });
1019 0 : let result = client
1020 0 : .simple_query(
1021 0 : "SELECT
1022 0 : row_to_json(pg_stat_statements)
1023 0 : FROM
1024 0 : pg_stat_statements
1025 0 : WHERE
1026 0 : userid != 'cloud_admin'::regrole::oid
1027 0 : ORDER BY
1028 0 : (mean_exec_time + mean_plan_time) DESC
1029 0 : LIMIT 100",
1030 0 : )
1031 0 : .await;
1032 :
1033 0 : if let Ok(raw_rows) = result {
1034 0 : for message in raw_rows.iter() {
1035 0 : if let postgres::SimpleQueryMessage::Row(row) = message {
1036 0 : if let Some(json) = row.get(0) {
1037 0 : result_rows.push(json.to_string());
1038 0 : }
1039 0 : }
1040 : }
1041 :
1042 0 : format!("{{\"pg_stat_statements\": [{}]}}", result_rows.join(","))
1043 : } else {
1044 0 : "{{\"pg_stat_statements\": []}}".to_string()
1045 : }
1046 0 : }
1047 :
1048 : // download an archive, unzip and place files in correct locations
1049 CBC 2 : pub async fn download_extension(
1050 2 : &self,
1051 2 : real_ext_name: String,
1052 2 : ext_path: RemotePath,
1053 2 : ) -> Result<u64, DownloadError> {
1054 2 : let ext_remote_storage =
1055 2 : self.ext_remote_storage
1056 2 : .as_ref()
1057 2 : .ok_or(DownloadError::BadInput(anyhow::anyhow!(
1058 2 : "Remote extensions storage is not configured",
1059 2 : )))?;
1060 :
1061 2 : let ext_archive_name = ext_path.object_name().expect("bad path");
1062 2 :
1063 2 : let mut first_try = false;
1064 2 : if !self
1065 2 : .ext_download_progress
1066 2 : .read()
1067 2 : .expect("lock err")
1068 2 : .contains_key(ext_archive_name)
1069 1 : {
1070 1 : self.ext_download_progress
1071 1 : .write()
1072 1 : .expect("lock err")
1073 1 : .insert(ext_archive_name.to_string(), (Utc::now(), false));
1074 1 : first_try = true;
1075 1 : }
1076 2 : let (download_start, download_completed) =
1077 2 : self.ext_download_progress.read().expect("lock err")[ext_archive_name];
1078 2 : let start_time_delta = Utc::now()
1079 2 : .signed_duration_since(download_start)
1080 2 : .to_std()
1081 2 : .unwrap()
1082 2 : .as_millis() as u64;
1083 2 :
1084 2 : // how long to wait for extension download if it was started by another process
1085 2 : const HANG_TIMEOUT: u64 = 3000; // milliseconds
1086 2 :
1087 2 : if download_completed {
1088 1 : info!("extension already downloaded, skipping re-download");
1089 1 : return Ok(0);
1090 1 : } else if start_time_delta < HANG_TIMEOUT && !first_try {
1091 UBC 0 : info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout");
1092 0 : let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
1093 : loop {
1094 0 : info!("waiting for download");
1095 0 : interval.tick().await;
1096 0 : let (_, download_completed_now) =
1097 0 : self.ext_download_progress.read().expect("lock")[ext_archive_name];
1098 0 : if download_completed_now {
1099 0 : info!("download finished by whoever else downloaded it");
1100 0 : return Ok(0);
1101 0 : }
1102 : }
1103 : // NOTE: the above loop will get terminated
1104 : // based on the timeout of the download function
1105 CBC 1 : }
1106 :
1107 : // if extension hasn't been downloaded before or the previous
1108 : // attempt to download was at least HANG_TIMEOUT ms ago
1109 : // then we try to download it here
1110 1 : info!("downloading new extension {ext_archive_name}");
1111 :
1112 1 : let download_size = extension_server::download_extension(
1113 1 : &real_ext_name,
1114 1 : &ext_path,
1115 1 : ext_remote_storage,
1116 1 : &self.pgbin,
1117 1 : )
1118 58 : .await
1119 1 : .map_err(DownloadError::Other);
1120 1 :
1121 1 : self.ext_download_progress
1122 1 : .write()
1123 1 : .expect("bad lock")
1124 1 : .insert(ext_archive_name.to_string(), (download_start, true));
1125 1 :
1126 1 : download_size
1127 2 : }
1128 :
1129 : #[tokio::main]
1130 1 : pub async fn prepare_preload_libraries(
1131 1 : &self,
1132 1 : spec: &ComputeSpec,
1133 1 : ) -> Result<RemoteExtensionMetrics> {
1134 1 : if self.ext_remote_storage.is_none() {
1135 UBC 0 : return Ok(RemoteExtensionMetrics {
1136 0 : num_ext_downloaded: 0,
1137 0 : largest_ext_size: 0,
1138 0 : total_ext_download_size: 0,
1139 0 : });
1140 CBC 1 : }
1141 1 : let remote_extensions = spec
1142 1 : .remote_extensions
1143 1 : .as_ref()
1144 1 : .ok_or(anyhow::anyhow!("Remote extensions are not configured"))?;
1145 :
1146 1 : info!("parse shared_preload_libraries from spec.cluster.settings");
1147 1 : let mut libs_vec = Vec::new();
1148 1 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
1149 UBC 0 : libs_vec = libs
1150 0 : .split(&[',', '\'', ' '])
1151 0 : .filter(|s| *s != "neon" && !s.is_empty())
1152 0 : .map(str::to_string)
1153 0 : .collect();
1154 CBC 1 : }
1155 1 : info!("parse shared_preload_libraries from provided postgresql.conf");
1156 :
1157 : // that is used in neon_local and python tests
1158 1 : if let Some(conf) = &spec.cluster.postgresql_conf {
1159 1 : let conf_lines = conf.split('\n').collect::<Vec<&str>>();
1160 1 : let mut shared_preload_libraries_line = "";
1161 22 : for line in conf_lines {
1162 21 : if line.starts_with("shared_preload_libraries") {
1163 1 : shared_preload_libraries_line = line;
1164 20 : }
1165 : }
1166 1 : let mut preload_libs_vec = Vec::new();
1167 1 : if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
1168 UBC 0 : preload_libs_vec = libs
1169 0 : .split(&[',', '\'', ' '])
1170 0 : .filter(|s| *s != "neon" && !s.is_empty())
1171 0 : .map(str::to_string)
1172 0 : .collect();
1173 CBC 1 : }
1174 1 : libs_vec.extend(preload_libs_vec);
1175 UBC 0 : }
1176 :
1177 : // Don't try to download libraries that are not in the index.
1178 : // Assume that they are already present locally.
1179 CBC 1 : libs_vec.retain(|lib| remote_extensions.library_index.contains_key(lib));
1180 1 :
1181 1 : info!("Downloading to shared preload libraries: {:?}", &libs_vec);
1182 :
1183 1 : let mut download_tasks = Vec::new();
1184 1 : for library in &libs_vec {
1185 UBC 0 : let (ext_name, ext_path) =
1186 0 : remote_extensions.get_ext(library, true, &self.build_tag, &self.pgversion)?;
1187 0 : download_tasks.push(self.download_extension(ext_name, ext_path));
1188 : }
1189 CBC 1 : let results = join_all(download_tasks).await;
1190 :
1191 1 : let mut remote_ext_metrics = RemoteExtensionMetrics {
1192 1 : num_ext_downloaded: 0,
1193 1 : largest_ext_size: 0,
1194 1 : total_ext_download_size: 0,
1195 1 : };
1196 1 : for result in results {
1197 UBC 0 : let download_size = match result {
1198 0 : Ok(res) => {
1199 0 : remote_ext_metrics.num_ext_downloaded += 1;
1200 0 : res
1201 : }
1202 0 : Err(err) => {
1203 0 : // if we failed to download an extension, we don't want to fail the whole
1204 0 : // process, but we do want to log the error
1205 0 : error!("Failed to download extension: {}", err);
1206 0 : 0
1207 : }
1208 : };
1209 :
1210 0 : remote_ext_metrics.largest_ext_size =
1211 0 : std::cmp::max(remote_ext_metrics.largest_ext_size, download_size);
1212 0 : remote_ext_metrics.total_ext_download_size += download_size;
1213 : }
1214 CBC 1 : Ok(remote_ext_metrics)
1215 : }
1216 : }
|