Line data Source code
1 : use std::collections::HashMap;
2 : use std::env;
3 : use std::fs;
4 : use std::io::BufRead;
5 : use std::os::unix::fs::{symlink, PermissionsExt};
6 : use std::path::Path;
7 : use std::process::{Command, Stdio};
8 : use std::str::FromStr;
9 : use std::sync::atomic::AtomicU32;
10 : use std::sync::atomic::Ordering;
11 : use std::sync::{Condvar, Mutex, RwLock};
12 : use std::thread;
13 : use std::time::Instant;
14 :
15 : use anyhow::{Context, Result};
16 : use chrono::{DateTime, Utc};
17 : use futures::future::join_all;
18 : use futures::stream::FuturesUnordered;
19 : use futures::StreamExt;
20 : use postgres::{Client, NoTls};
21 : use tokio;
22 : use tokio_postgres;
23 : use tracing::{debug, error, info, instrument, warn};
24 : use utils::id::{TenantId, TimelineId};
25 : use utils::lsn::Lsn;
26 :
27 : use compute_api::responses::{ComputeMetrics, ComputeStatus};
28 : use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec};
29 : use utils::measured_stream::MeasuredReader;
30 :
31 : use remote_storage::{DownloadError, RemotePath};
32 :
33 : use crate::checker::create_availability_check_data;
34 : use crate::logger::inlinify;
35 : use crate::pg_helpers::*;
36 : use crate::spec::*;
37 : use crate::sync_sk::{check_if_synced, ping_safekeeper};
38 : use crate::{config, extension_server};
39 :
40 : pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
41 : pub static PG_PID: AtomicU32 = AtomicU32::new(0);
42 :
43 : /// Compute node info shared across several `compute_ctl` threads.
44 : pub struct ComputeNode {
45 : // Url type maintains proper escaping
46 : pub connstr: url::Url,
47 : pub pgdata: String,
48 : pub pgbin: String,
49 : pub pgversion: String,
50 : /// We should only allow live re- / configuration of the compute node if
51 : /// it uses 'pull model', i.e. it can go to control-plane and fetch
52 : /// the latest configuration. Otherwise, there could be a case:
53 : /// - we start compute with some spec provided as argument
54 : /// - we push new spec and it does reconfiguration
55 : /// - but then something happens and compute pod / VM is destroyed,
56 : /// so k8s controller starts it again with the **old** spec
57 : /// and the same for empty computes:
58 : /// - we started compute without any spec
59 : /// - we push spec and it does configuration
60 : /// - but then it is restarted without any spec again
61 : pub live_config_allowed: bool,
62 : /// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
63 : /// To allow HTTP API server to serving status requests, while configuration
64 : /// is in progress, lock should be held only for short periods of time to do
65 : /// read/write, not the whole configuration process.
66 : pub state: Mutex<ComputeState>,
67 : /// `Condvar` to allow notifying waiters about state changes.
68 : pub state_changed: Condvar,
69 : /// the address of extension storage proxy gateway
70 : pub ext_remote_storage: Option<String>,
71 : // key: ext_archive_name, value: started download time, download_completed?
72 : pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
73 : pub build_tag: String,
74 : }
75 :
76 : // store some metrics about download size that might impact startup time
77 2 : #[derive(Clone, Debug)]
78 : pub struct RemoteExtensionMetrics {
79 : num_ext_downloaded: u64,
80 : largest_ext_size: u64,
81 : total_ext_download_size: u64,
82 : }
83 :
84 1373 : #[derive(Clone, Debug)]
85 : pub struct ComputeState {
86 : pub start_time: DateTime<Utc>,
87 : pub status: ComputeStatus,
88 : /// Timestamp of the last Postgres activity. It could be `None` if
89 : /// compute wasn't used since start.
90 : pub last_active: Option<DateTime<Utc>>,
91 : pub error: Option<String>,
92 : pub pspec: Option<ParsedSpec>,
93 : pub metrics: ComputeMetrics,
94 : }
95 :
96 : impl ComputeState {
97 571 : pub fn new() -> Self {
98 571 : Self {
99 571 : start_time: Utc::now(),
100 571 : status: ComputeStatus::Empty,
101 571 : last_active: None,
102 571 : error: None,
103 571 : pspec: None,
104 571 : metrics: ComputeMetrics::default(),
105 571 : }
106 571 : }
107 : }
108 :
109 : impl Default for ComputeState {
110 0 : fn default() -> Self {
111 0 : Self::new()
112 0 : }
113 : }
114 :
115 2129 : #[derive(Clone, Debug)]
116 : pub struct ParsedSpec {
117 : pub spec: ComputeSpec,
118 : pub tenant_id: TenantId,
119 : pub timeline_id: TimelineId,
120 : pub pageserver_connstr: String,
121 : pub safekeeper_connstrings: Vec<String>,
122 : pub storage_auth_token: Option<String>,
123 : }
124 :
125 : impl TryFrom<ComputeSpec> for ParsedSpec {
126 : type Error = String;
127 802 : fn try_from(spec: ComputeSpec) -> Result<Self, String> {
128 : // Extract the options from the spec file that are needed to connect to
129 : // the storage system.
130 : //
131 : // For backwards-compatibility, the top-level fields in the spec file
132 : // may be empty. In that case, we need to dig them from the GUCs in the
133 : // cluster.settings field.
134 802 : let pageserver_connstr = spec
135 802 : .pageserver_connstring
136 802 : .clone()
137 802 : .or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
138 802 : .ok_or("pageserver connstr should be provided")?;
139 802 : let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
140 46 : if matches!(spec.mode, ComputeMode::Primary) {
141 0 : spec.cluster
142 0 : .settings
143 0 : .find("neon.safekeepers")
144 0 : .ok_or("safekeeper connstrings should be provided")?
145 0 : .split(',')
146 0 : .map(|str| str.to_string())
147 0 : .collect()
148 : } else {
149 46 : vec![]
150 : }
151 : } else {
152 756 : spec.safekeeper_connstrings.clone()
153 : };
154 802 : let storage_auth_token = spec.storage_auth_token.clone();
155 802 : let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
156 802 : tenant_id
157 : } else {
158 0 : spec.cluster
159 0 : .settings
160 0 : .find("neon.tenant_id")
161 0 : .ok_or("tenant id should be provided")
162 0 : .map(|s| TenantId::from_str(&s))?
163 0 : .or(Err("invalid tenant id"))?
164 : };
165 802 : let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
166 802 : timeline_id
167 : } else {
168 0 : spec.cluster
169 0 : .settings
170 0 : .find("neon.timeline_id")
171 0 : .ok_or("timeline id should be provided")
172 0 : .map(|s| TimelineId::from_str(&s))?
173 0 : .or(Err("invalid timeline id"))?
174 : };
175 :
176 802 : Ok(ParsedSpec {
177 802 : spec,
178 802 : pageserver_connstr,
179 802 : safekeeper_connstrings,
180 802 : storage_auth_token,
181 802 : tenant_id,
182 802 : timeline_id,
183 802 : })
184 802 : }
185 : }
186 :
187 : /// If we are a VM, returns a [`Command`] that will run in the `neon-postgres`
188 : /// cgroup. Otherwise returns the default `Command::new(cmd)`
189 : ///
190 : /// This function should be used to start postgres, as it will start it in the
191 : /// neon-postgres cgroup if we are a VM. This allows autoscaling to control
192 : /// postgres' resource usage. The cgroup will exist in VMs because vm-builder
193 : /// creates it during the sysinit phase of its inittab.
194 1500 : fn maybe_cgexec(cmd: &str) -> Command {
195 1500 : // The cplane sets this env var for autoscaling computes.
196 1500 : // use `var_os` so we don't have to worry about the variable being valid
197 1500 : // unicode. Should never be an concern . . . but just in case
198 1500 : if env::var_os("AUTOSCALING").is_some() {
199 0 : let mut command = Command::new("cgexec");
200 0 : command.args(["-g", "memory:neon-postgres"]);
201 0 : command.arg(cmd);
202 0 : command
203 : } else {
204 1500 : Command::new(cmd)
205 : }
206 1500 : }
207 :
208 : /// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
209 : /// that we give to customers
210 8 : #[instrument(skip_all)]
211 : fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
212 : let roles = spec
213 : .cluster
214 : .roles
215 : .iter()
216 0 : .map(|r| escape_literal(&r.name))
217 : .collect::<Vec<_>>();
218 :
219 : let dbs = spec
220 : .cluster
221 : .databases
222 : .iter()
223 0 : .map(|db| escape_literal(&db.name))
224 : .collect::<Vec<_>>();
225 :
226 : let roles_decl = if roles.is_empty() {
227 : String::from("roles text[] := NULL;")
228 : } else {
229 : format!(
230 : r#"
231 : roles text[] := ARRAY(SELECT rolname
232 : FROM pg_catalog.pg_roles
233 : WHERE rolname IN ({}));"#,
234 : roles.join(", ")
235 : )
236 : };
237 :
238 : let database_decl = if dbs.is_empty() {
239 : String::from("dbs text[] := NULL;")
240 : } else {
241 : format!(
242 : r#"
243 : dbs text[] := ARRAY(SELECT datname
244 : FROM pg_catalog.pg_database
245 : WHERE datname IN ({}));"#,
246 : dbs.join(", ")
247 : )
248 : };
249 :
250 : // ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases
251 : // (see https://www.postgresql.org/docs/current/ddl-priv.html)
252 : let query = format!(
253 : r#"
254 : DO $$
255 : DECLARE
256 : r text;
257 : {}
258 : {}
259 : BEGIN
260 : IF NOT EXISTS (
261 : SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
262 : THEN
263 : CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
264 : IF array_length(roles, 1) IS NOT NULL THEN
265 : EXECUTE format('GRANT neon_superuser TO %s',
266 : array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
267 : FOREACH r IN ARRAY roles LOOP
268 : EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r));
269 : END LOOP;
270 : END IF;
271 : IF array_length(dbs, 1) IS NOT NULL THEN
272 : EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser',
273 : array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', '));
274 : END IF;
275 : END IF;
276 : END
277 : $$;"#,
278 : roles_decl, database_decl,
279 : );
280 8 : info!("Neon superuser created: {}", inlinify(&query));
281 : client
282 : .simple_query(&query)
283 0 : .map_err(|e| anyhow::anyhow!(e).context(query))?;
284 : Ok(())
285 : }
286 :
287 : impl ComputeNode {
288 : /// Check that compute node has corresponding feature enabled.
289 12375 : pub fn has_feature(&self, feature: ComputeFeature) -> bool {
290 12375 : let state = self.state.lock().unwrap();
291 :
292 12375 : if let Some(s) = state.pspec.as_ref() {
293 12375 : s.spec.features.contains(&feature)
294 : } else {
295 0 : false
296 : }
297 12375 : }
298 :
299 802 : pub fn set_status(&self, status: ComputeStatus) {
300 802 : let mut state = self.state.lock().unwrap();
301 802 : state.status = status;
302 802 : self.state_changed.notify_all();
303 802 : }
304 :
305 0 : pub fn get_status(&self) -> ComputeStatus {
306 0 : self.state.lock().unwrap().status
307 0 : }
308 :
309 : // Remove `pgdata` directory and create it again with right permissions.
310 571 : fn create_pgdata(&self) -> Result<()> {
311 571 : // Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
312 571 : // If it is something different then create_dir() will error out anyway.
313 571 : let _ok = fs::remove_dir_all(&self.pgdata);
314 571 : fs::create_dir(&self.pgdata)?;
315 571 : fs::set_permissions(&self.pgdata, fs::Permissions::from_mode(0o700))?;
316 :
317 571 : Ok(())
318 571 : }
319 :
320 : // Get basebackup from the libpq connection to pageserver using `connstr` and
321 : // unarchive it to `pgdata` directory overriding all its previous content.
322 572 : #[instrument(skip_all, fields(%lsn))]
323 : fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
324 : let spec = compute_state.pspec.as_ref().expect("spec must be set");
325 : let start_time = Instant::now();
326 :
327 : let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
328 :
329 : // Use the storage auth token from the config file, if given.
330 : // Note: this overrides any password set in the connection string.
331 : if let Some(storage_auth_token) = &spec.storage_auth_token {
332 15 : info!("Got storage auth token from spec file");
333 : config.password(storage_auth_token);
334 : } else {
335 557 : info!("Storage auth token not set");
336 : }
337 :
338 : // Connect to pageserver
339 : let mut client = config.connect(NoTls)?;
340 : let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
341 :
342 : let basebackup_cmd = match lsn {
343 : // HACK We don't use compression on first start (Lsn(0)) because there's no API for it
344 : Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id),
345 : _ => format!(
346 : "basebackup {} {} {} --gzip",
347 : spec.tenant_id, spec.timeline_id, lsn
348 : ),
349 : };
350 :
351 : let copyreader = client.copy_out(basebackup_cmd.as_str())?;
352 : let mut measured_reader = MeasuredReader::new(copyreader);
353 :
354 : // Check the magic number to see if it's a gzip or not. Even though
355 : // we might explicitly ask for gzip, an old pageserver with no implementation
356 : // of gzip compression might send us uncompressed data. After some time
357 : // passes we can assume all pageservers know how to compress and we can
358 : // delete this check.
359 : //
360 : // If the data is not gzip, it will be tar. It will not be mistakenly
361 : // recognized as gzip because tar starts with an ascii encoding of a filename,
362 : // and 0x1f and 0x8b are unlikely first characters for any filename. Moreover,
363 : // we send the "global" directory first from the pageserver, so it definitely
364 : // won't be recognized as gzip.
365 : let mut bufreader = std::io::BufReader::new(&mut measured_reader);
366 : let gzip = {
367 : let peek = bufreader.fill_buf().unwrap();
368 : peek[0] == 0x1f && peek[1] == 0x8b
369 : };
370 :
371 : // Read the archive directly from the `CopyOutReader`
372 : //
373 : // Set `ignore_zeros` so that unpack() reads all the Copy data and
374 : // doesn't stop at the end-of-archive marker. Otherwise, if the server
375 : // sends an Error after finishing the tarball, we will not notice it.
376 : if gzip {
377 : let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut bufreader));
378 : ar.set_ignore_zeros(true);
379 : ar.unpack(&self.pgdata)?;
380 : } else {
381 : let mut ar = tar::Archive::new(&mut bufreader);
382 : ar.set_ignore_zeros(true);
383 : ar.unpack(&self.pgdata)?;
384 : };
385 :
386 : // Report metrics
387 : let mut state = self.state.lock().unwrap();
388 : state.metrics.pageserver_connect_micros = pageserver_connect_micros;
389 : state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
390 : state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
391 : Ok(())
392 : }
393 :
394 : // Gets the basebackup in a retry loop
395 571 : #[instrument(skip_all, fields(%lsn))]
396 : pub fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
397 : let mut retry_period_ms = 500;
398 : let mut attempts = 0;
399 : let max_attempts = 5;
400 : loop {
401 : let result = self.try_get_basebackup(compute_state, lsn);
402 : match result {
403 : Ok(_) => {
404 : return result;
405 : }
406 : Err(ref e) if attempts < max_attempts => {
407 1 : warn!(
408 1 : "Failed to get basebackup: {} (attempt {}/{})",
409 1 : e, attempts, max_attempts
410 1 : );
411 : std::thread::sleep(std::time::Duration::from_millis(retry_period_ms));
412 : retry_period_ms *= 2;
413 : }
414 : Err(_) => {
415 : return result;
416 : }
417 : }
418 : attempts += 1;
419 : }
420 : }
421 :
422 525 : pub async fn check_safekeepers_synced_async(
423 525 : &self,
424 525 : compute_state: &ComputeState,
425 525 : ) -> Result<Option<Lsn>> {
426 525 : // Construct a connection config for each safekeeper
427 525 : let pspec: ParsedSpec = compute_state
428 525 : .pspec
429 525 : .as_ref()
430 525 : .expect("spec must be set")
431 525 : .clone();
432 525 : let sk_connstrs: Vec<String> = pspec.safekeeper_connstrings.clone();
433 666 : let sk_configs = sk_connstrs.into_iter().map(|connstr| {
434 666 : // Format connstr
435 666 : let id = connstr.clone();
436 666 : let connstr = format!("postgresql://no_user@{}", connstr);
437 666 : let options = format!(
438 666 : "-c timeline_id={} tenant_id={}",
439 666 : pspec.timeline_id, pspec.tenant_id
440 666 : );
441 666 :
442 666 : // Construct client
443 666 : let mut config = tokio_postgres::Config::from_str(&connstr).unwrap();
444 666 : config.options(&options);
445 666 : if let Some(storage_auth_token) = pspec.storage_auth_token.clone() {
446 26 : config.password(storage_auth_token);
447 640 : }
448 :
449 666 : (id, config)
450 666 : });
451 525 :
452 525 : // Create task set to query all safekeepers
453 525 : let mut tasks = FuturesUnordered::new();
454 525 : let quorum = sk_configs.len() / 2 + 1;
455 1191 : for (id, config) in sk_configs {
456 666 : let timeout = tokio::time::Duration::from_millis(100);
457 666 : let task = tokio::time::timeout(timeout, ping_safekeeper(id, config));
458 666 : tasks.push(tokio::spawn(task));
459 666 : }
460 :
461 : // Get a quorum of responses or errors
462 525 : let mut responses = Vec::new();
463 525 : let mut join_errors = Vec::new();
464 525 : let mut task_errors = Vec::new();
465 525 : let mut timeout_errors = Vec::new();
466 1123 : while let Some(response) = tasks.next().await {
467 608 : match response {
468 597 : Ok(Ok(Ok(r))) => responses.push(r),
469 11 : Ok(Ok(Err(e))) => task_errors.push(e),
470 0 : Ok(Err(e)) => timeout_errors.push(e),
471 0 : Err(e) => join_errors.push(e),
472 : };
473 608 : if responses.len() >= quorum {
474 525 : break;
475 83 : }
476 83 : if join_errors.len() + task_errors.len() + timeout_errors.len() >= quorum {
477 0 : break;
478 83 : }
479 : }
480 :
481 : // In case of error, log and fail the check, but don't crash.
482 : // We're playing it safe because these errors could be transient
483 : // and we don't yet retry. Also being careful here allows us to
484 : // be backwards compatible with safekeepers that don't have the
485 : // TIMELINE_STATUS API yet.
486 525 : if responses.len() < quorum {
487 0 : error!(
488 0 : "failed sync safekeepers check {:?} {:?} {:?}",
489 0 : join_errors, task_errors, timeout_errors
490 0 : );
491 0 : return Ok(None);
492 525 : }
493 525 :
494 525 : Ok(check_if_synced(responses))
495 525 : }
496 :
497 : // Fast path for sync_safekeepers. If they're already synced we get the lsn
498 : // in one roundtrip. If not, we should do a full sync_safekeepers.
499 525 : pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
500 525 : let start_time = Utc::now();
501 525 :
502 525 : // Run actual work with new tokio runtime
503 525 : let rt = tokio::runtime::Builder::new_current_thread()
504 525 : .enable_all()
505 525 : .build()
506 525 : .expect("failed to create rt");
507 525 : let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
508 525 :
509 525 : // Record runtime
510 525 : self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
511 525 : .signed_duration_since(start_time)
512 525 : .to_std()
513 525 : .unwrap()
514 525 : .as_millis() as u64;
515 525 : result
516 525 : }
517 :
518 : // Run `postgres` in a special mode with `--sync-safekeepers` argument
519 : // and return the reported LSN back to the caller.
520 929 : #[instrument(skip_all)]
521 : pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
522 : let start_time = Utc::now();
523 :
524 : let mut sync_handle = maybe_cgexec(&self.pgbin)
525 : .args(["--sync-safekeepers"])
526 : .env("PGDATA", &self.pgdata) // we cannot use -D in this mode
527 : .envs(if let Some(storage_auth_token) = &storage_auth_token {
528 : vec![("NEON_AUTH_TOKEN", storage_auth_token)]
529 : } else {
530 : vec![]
531 : })
532 : .stdout(Stdio::piped())
533 : .stderr(Stdio::piped())
534 : .spawn()
535 : .expect("postgres --sync-safekeepers failed to start");
536 : SYNC_SAFEKEEPERS_PID.store(sync_handle.id(), Ordering::SeqCst);
537 :
538 : // `postgres --sync-safekeepers` will print all log output to stderr and
539 : // final LSN to stdout. So we leave stdout to collect LSN, while stderr logs
540 : // will be collected in a child thread.
541 : let stderr = sync_handle
542 : .stderr
543 : .take()
544 : .expect("stderr should be captured");
545 : let logs_handle = handle_postgres_logs(stderr);
546 :
547 : let sync_output = sync_handle
548 : .wait_with_output()
549 : .expect("postgres --sync-safekeepers failed");
550 : SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst);
551 :
552 : // Process has exited, so we can join the logs thread.
553 : let _ = logs_handle
554 : .join()
555 0 : .map_err(|e| tracing::error!("log thread panicked: {:?}", e));
556 :
557 : if !sync_output.status.success() {
558 : anyhow::bail!(
559 : "postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
560 : sync_output.status,
561 : String::from_utf8(sync_output.stdout)
562 : .expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
563 : );
564 : }
565 :
566 : self.state.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now()
567 : .signed_duration_since(start_time)
568 : .to_std()
569 : .unwrap()
570 : .as_millis() as u64;
571 :
572 : let lsn = Lsn::from_str(String::from_utf8(sync_output.stdout)?.trim())?;
573 :
574 : Ok(lsn)
575 : }
576 :
577 : /// Do all the preparations like PGDATA directory creation, configuration,
578 : /// safekeepers sync, basebackup, etc.
579 571 : #[instrument(skip_all)]
580 : pub fn prepare_pgdata(
581 : &self,
582 : compute_state: &ComputeState,
583 : extension_server_port: u16,
584 : ) -> Result<()> {
585 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
586 : let spec = &pspec.spec;
587 : let pgdata_path = Path::new(&self.pgdata);
588 :
589 : // Remove/create an empty pgdata directory and put configuration there.
590 : self.create_pgdata()?;
591 : config::write_postgres_conf(
592 : &pgdata_path.join("postgresql.conf"),
593 : &pspec.spec,
594 : Some(extension_server_port),
595 : )?;
596 :
597 : // Syncing safekeepers is only safe with primary nodes: if a primary
598 : // is already connected it will be kicked out, so a secondary (standby)
599 : // cannot sync safekeepers.
600 : let lsn = match spec.mode {
601 : ComputeMode::Primary => {
602 525 : info!("checking if safekeepers are synced");
603 : let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
604 : lsn
605 : } else {
606 404 : info!("starting safekeepers syncing");
607 : self.sync_safekeepers(pspec.storage_auth_token.clone())
608 0 : .with_context(|| "failed to sync safekeepers")?
609 : };
610 525 : info!("safekeepers synced at LSN {}", lsn);
611 : lsn
612 : }
613 : ComputeMode::Static(lsn) => {
614 44 : info!("Starting read-only node at static LSN {}", lsn);
615 : lsn
616 : }
617 : ComputeMode::Replica => {
618 2 : info!("Initializing standby from latest Pageserver LSN");
619 : Lsn(0)
620 : }
621 : };
622 :
623 571 : info!(
624 571 : "getting basebackup@{} from pageserver {}",
625 571 : lsn, &pspec.pageserver_connstr
626 571 : );
627 0 : self.get_basebackup(compute_state, lsn).with_context(|| {
628 0 : format!(
629 0 : "failed to get basebackup@{} from pageserver {}",
630 0 : lsn, &pspec.pageserver_connstr
631 0 : )
632 0 : })?;
633 :
634 : // Update pg_hba.conf received with basebackup.
635 : update_pg_hba(pgdata_path)?;
636 :
637 : // Place pg_dynshmem under /dev/shm. This allows us to use
638 : // 'dynamic_shared_memory_type = mmap' so that the files are placed in
639 : // /dev/shm, similar to how 'dynamic_shared_memory_type = posix' works.
640 : //
641 : // Why on earth don't we just stick to the 'posix' default, you might
642 : // ask. It turns out that making large allocations with 'posix' doesn't
643 : // work very well with autoscaling. The behavior we want is that:
644 : //
645 : // 1. You can make large DSM allocations, larger than the current RAM
646 : // size of the VM, without errors
647 : //
648 : // 2. If the allocated memory is really used, the VM is scaled up
649 : // automatically to accommodate that
650 : //
651 : // We try to make that possible by having swap in the VM. But with the
652 : // default 'posix' DSM implementation, we fail step 1, even when there's
653 : // plenty of swap available. PostgreSQL uses posix_fallocate() to create
654 : // the shmem segment, which is really just a file in /dev/shm in Linux,
655 : // but posix_fallocate() on tmpfs returns ENOMEM if the size is larger
656 : // than available RAM.
657 : //
658 : // Using 'dynamic_shared_memory_type = mmap' works around that, because
659 : // the Postgres 'mmap' DSM implementation doesn't use
660 : // posix_fallocate(). Instead, it uses repeated calls to write(2) to
661 : // fill the file with zeros. It's weird that that differs between
662 : // 'posix' and 'mmap', but we take advantage of it. When the file is
663 : // filled slowly with write(2), the kernel allows it to grow larger, as
664 : // long as there's swap available.
665 : //
666 : // In short, using 'dynamic_shared_memory_type = mmap' allows us one DSM
667 : // segment to be larger than currently available RAM. But because we
668 : // don't want to store it on a real file, which the kernel would try to
669 : // flush to disk, so symlink pg_dynshm to /dev/shm.
670 : //
671 : // We don't set 'dynamic_shared_memory_type = mmap' here, we let the
672 : // control plane control that option. If 'mmap' is not used, this
673 : // symlink doesn't affect anything.
674 : //
675 : // See https://github.com/neondatabase/autoscaling/issues/800
676 : std::fs::remove_dir(pgdata_path.join("pg_dynshmem"))?;
677 : symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?;
678 :
679 : match spec.mode {
680 : ComputeMode::Primary => {}
681 : ComputeMode::Replica | ComputeMode::Static(..) => {
682 : add_standby_signal(pgdata_path)?;
683 : }
684 : }
685 :
686 : Ok(())
687 : }
688 :
689 : /// Start and stop a postgres process to warm up the VM for startup.
690 0 : pub fn prewarm_postgres(&self) -> Result<()> {
691 0 : info!("prewarming");
692 :
693 : // Create pgdata
694 0 : let pgdata = &format!("{}.warmup", self.pgdata);
695 0 : create_pgdata(pgdata)?;
696 :
697 : // Run initdb to completion
698 0 : info!("running initdb");
699 0 : let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
700 0 : Command::new(initdb_bin)
701 0 : .args(["-D", pgdata])
702 0 : .output()
703 0 : .expect("cannot start initdb process");
704 0 :
705 0 : // Write conf
706 0 : use std::io::Write;
707 0 : let conf_path = Path::new(pgdata).join("postgresql.conf");
708 0 : let mut file = std::fs::File::create(conf_path)?;
709 0 : writeln!(file, "shared_buffers=65536")?;
710 0 : writeln!(file, "port=51055")?; // Nobody should be connecting
711 0 : writeln!(file, "shared_preload_libraries = 'neon'")?;
712 :
713 : // Start postgres
714 0 : info!("starting postgres");
715 0 : let mut pg = maybe_cgexec(&self.pgbin)
716 0 : .args(["-D", pgdata])
717 0 : .spawn()
718 0 : .expect("cannot start postgres process");
719 0 :
720 0 : // Stop it when it's ready
721 0 : info!("waiting for postgres");
722 0 : wait_for_postgres(&mut pg, Path::new(pgdata))?;
723 0 : pg.kill()?;
724 0 : info!("sent kill signal");
725 0 : pg.wait()?;
726 0 : info!("done prewarming");
727 :
728 : // clean up
729 0 : let _ok = fs::remove_dir_all(pgdata);
730 0 : Ok(())
731 0 : }
732 :
733 : /// Start Postgres as a child process and manage DBs/roles.
734 : /// After that this will hang waiting on the postmaster process to exit.
735 : /// Returns a handle to the child process and a handle to the logs thread.
736 571 : #[instrument(skip_all)]
737 : pub fn start_postgres(
738 : &self,
739 : storage_auth_token: Option<String>,
740 : ) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
741 : let pgdata_path = Path::new(&self.pgdata);
742 :
743 : // Run postgres as a child process.
744 : let mut pg = maybe_cgexec(&self.pgbin)
745 : .args(["-D", &self.pgdata])
746 : .envs(if let Some(storage_auth_token) = &storage_auth_token {
747 : vec![("NEON_AUTH_TOKEN", storage_auth_token)]
748 : } else {
749 : vec![]
750 : })
751 : .stderr(Stdio::piped())
752 : .spawn()
753 : .expect("cannot start postgres process");
754 : PG_PID.store(pg.id(), Ordering::SeqCst);
755 :
756 : // Start a thread to collect logs from stderr.
757 : let stderr = pg.stderr.take().expect("stderr should be captured");
758 : let logs_handle = handle_postgres_logs(stderr);
759 :
760 : wait_for_postgres(&mut pg, pgdata_path)?;
761 :
762 : Ok((pg, logs_handle))
763 : }
764 :
765 : /// Do initial configuration of the already started Postgres.
766 8 : #[instrument(skip_all)]
767 : pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
768 : // If connection fails,
769 : // it may be the old node with `zenith_admin` superuser.
770 : //
771 : // In this case we need to connect with old `zenith_admin` name
772 : // and create new user. We cannot simply rename connected user,
773 : // but we can create a new one and grant it all privileges.
774 : let connstr = self.connstr.clone();
775 : let mut client = match Client::connect(connstr.as_str(), NoTls) {
776 : Err(e) => {
777 0 : info!(
778 0 : "cannot connect to postgres: {}, retrying with `zenith_admin` username",
779 0 : e
780 0 : );
781 : let mut zenith_admin_connstr = connstr.clone();
782 :
783 : zenith_admin_connstr
784 : .set_username("zenith_admin")
785 0 : .map_err(|_| anyhow::anyhow!("invalid connstr"))?;
786 :
787 : let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
788 : // Disable forwarding so that users don't get a cloud_admin role
789 : client.simple_query("SET neon.forward_ddl = false")?;
790 : client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
791 : client.simple_query("GRANT zenith_admin TO cloud_admin")?;
792 : drop(client);
793 :
794 : // reconnect with connstring with expected name
795 : Client::connect(connstr.as_str(), NoTls)?
796 : }
797 : Ok(client) => client,
798 : };
799 :
800 : // Disable DDL forwarding because control plane already knows about these roles/databases.
801 : client.simple_query("SET neon.forward_ddl = false")?;
802 :
803 : // Proceed with post-startup configuration. Note, that order of operations is important.
804 : let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
805 : create_neon_superuser(spec, &mut client)?;
806 : cleanup_instance(&mut client)?;
807 : handle_roles(spec, &mut client)?;
808 : handle_databases(spec, &mut client)?;
809 : handle_role_deletions(spec, connstr.as_str(), &mut client)?;
810 : handle_grants(
811 : spec,
812 : &mut client,
813 : connstr.as_str(),
814 : self.has_feature(ComputeFeature::AnonExtension),
815 : )?;
816 : handle_extensions(spec, &mut client)?;
817 : handle_extension_neon(&mut client)?;
818 : create_availability_check_data(&mut client)?;
819 :
820 : // 'Close' connection
821 : drop(client);
822 :
823 : // Run migrations separately to not hold up cold starts
824 8 : thread::spawn(move || {
825 8 : let mut client = Client::connect(connstr.as_str(), NoTls)?;
826 8 : handle_migrations(&mut client)
827 8 : });
828 : Ok(())
829 : }
830 :
831 : // We could've wrapped this around `pg_ctl reload`, but right now we don't use
832 : // `pg_ctl` for start / stop, so this just seems much easier to do as we already
833 : // have opened connection to Postgres and superuser access.
834 478 : #[instrument(skip_all)]
835 : fn pg_reload_conf(&self) -> Result<()> {
836 : let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
837 : Command::new(pgctl_bin)
838 : .args(["reload", "-D", &self.pgdata])
839 : .output()
840 : .expect("cannot run pg_ctl process");
841 : Ok(())
842 : }
843 :
844 : /// Similar to `apply_config()`, but does a bit different sequence of operations,
845 : /// as it's used to reconfigure a previously started and configured Postgres node.
846 231 : #[instrument(skip_all)]
847 : pub fn reconfigure(&self) -> Result<()> {
848 : let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
849 :
850 : if let Some(ref pgbouncer_settings) = spec.pgbouncer_settings {
851 0 : info!("tuning pgbouncer");
852 :
853 : let rt = tokio::runtime::Builder::new_current_thread()
854 : .enable_all()
855 : .build()
856 : .expect("failed to create rt");
857 :
858 : // Spawn a thread to do the tuning,
859 : // so that we don't block the main thread that starts Postgres.
860 : let pgbouncer_settings = pgbouncer_settings.clone();
861 0 : let _handle = thread::spawn(move || {
862 0 : let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
863 0 : if let Err(err) = res {
864 0 : error!("error while tuning pgbouncer: {err:?}");
865 0 : }
866 0 : });
867 : }
868 :
869 : // Write new config
870 : let pgdata_path = Path::new(&self.pgdata);
871 : let postgresql_conf_path = pgdata_path.join("postgresql.conf");
872 : config::write_postgres_conf(&postgresql_conf_path, &spec, None)?;
873 : // temporarily reset max_cluster_size in config
874 : // to avoid the possibility of hitting the limit, while we are reconfiguring:
875 : // creating new extensions, roles, etc...
876 : config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
877 : self.pg_reload_conf()?;
878 :
879 : let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
880 :
881 : // Proceed with post-startup configuration. Note, that order of operations is important.
882 : // Disable DDL forwarding because control plane already knows about these roles/databases.
883 : if spec.mode == ComputeMode::Primary {
884 : client.simple_query("SET neon.forward_ddl = false")?;
885 : cleanup_instance(&mut client)?;
886 : handle_roles(&spec, &mut client)?;
887 : handle_databases(&spec, &mut client)?;
888 : handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
889 : handle_grants(
890 : &spec,
891 : &mut client,
892 : self.connstr.as_str(),
893 : self.has_feature(ComputeFeature::AnonExtension),
894 : )?;
895 : handle_extensions(&spec, &mut client)?;
896 : handle_extension_neon(&mut client)?;
897 : // We can skip handle_migrations here because a new migration can only appear
898 : // if we have a new version of the compute_ctl binary, which can only happen
899 : // if compute got restarted, in which case we'll end up inside of apply_config
900 : // instead of reconfigure.
901 : }
902 :
903 : // 'Close' connection
904 : drop(client);
905 :
906 : // reset max_cluster_size in config back to original value and reload config
907 : config::compute_ctl_temp_override_remove(pgdata_path)?;
908 : self.pg_reload_conf()?;
909 :
910 : let unknown_op = "unknown".to_string();
911 : let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
912 231 : info!(
913 231 : "finished reconfiguration of compute node for operation {}",
914 231 : op_id
915 231 : );
916 :
917 : Ok(())
918 : }
919 :
920 571 : #[instrument(skip_all)]
921 : pub fn start_compute(
922 : &self,
923 : extension_server_port: u16,
924 : ) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
925 : let compute_state = self.state.lock().unwrap().clone();
926 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
927 571 : info!(
928 571 : "starting compute for project {}, operation {}, tenant {}, timeline {}",
929 571 : pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
930 571 : pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
931 571 : pspec.tenant_id,
932 571 : pspec.timeline_id,
933 571 : );
934 :
935 : // tune pgbouncer
936 : if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
937 0 : info!("tuning pgbouncer");
938 :
939 : let rt = tokio::runtime::Builder::new_current_thread()
940 : .enable_all()
941 : .build()
942 : .expect("failed to create rt");
943 :
944 : // Spawn a thread to do the tuning,
945 : // so that we don't block the main thread that starts Postgres.
946 : let pgbouncer_settings = pgbouncer_settings.clone();
947 0 : let _handle = thread::spawn(move || {
948 0 : let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
949 0 : if let Err(err) = res {
950 0 : error!("error while tuning pgbouncer: {err:?}");
951 0 : }
952 0 : });
953 : }
954 :
955 571 : info!(
956 571 : "start_compute spec.remote_extensions {:?}",
957 571 : pspec.spec.remote_extensions
958 571 : );
959 :
960 : // This part is sync, because we need to download
961 : // remote shared_preload_libraries before postgres start (if any)
962 : if let Some(remote_extensions) = &pspec.spec.remote_extensions {
963 : // First, create control files for all availale extensions
964 : extension_server::create_control_files(remote_extensions, &self.pgbin);
965 :
966 : let library_load_start_time = Utc::now();
967 : let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
968 :
969 : let library_load_time = Utc::now()
970 : .signed_duration_since(library_load_start_time)
971 : .to_std()
972 : .unwrap()
973 : .as_millis() as u64;
974 : let mut state = self.state.lock().unwrap();
975 : state.metrics.load_ext_ms = library_load_time;
976 : state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded;
977 : state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size;
978 : state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size;
979 1 : info!(
980 1 : "Loading shared_preload_libraries took {:?}ms",
981 1 : library_load_time
982 1 : );
983 1 : info!("{:?}", remote_ext_metrics);
984 : }
985 :
986 : self.prepare_pgdata(&compute_state, extension_server_port)?;
987 :
988 : let start_time = Utc::now();
989 : let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
990 :
991 : let config_time = Utc::now();
992 : if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
993 : let pgdata_path = Path::new(&self.pgdata);
994 : // temporarily reset max_cluster_size in config
995 : // to avoid the possibility of hitting the limit, while we are applying config:
996 : // creating new extensions, roles, etc...
997 : config::compute_ctl_temp_override_create(pgdata_path, "neon.max_cluster_size=-1")?;
998 : self.pg_reload_conf()?;
999 :
1000 : self.apply_config(&compute_state)?;
1001 :
1002 : config::compute_ctl_temp_override_remove(pgdata_path)?;
1003 : self.pg_reload_conf()?;
1004 : }
1005 :
1006 : let startup_end_time = Utc::now();
1007 : {
1008 : let mut state = self.state.lock().unwrap();
1009 : state.metrics.start_postgres_ms = config_time
1010 : .signed_duration_since(start_time)
1011 : .to_std()
1012 : .unwrap()
1013 : .as_millis() as u64;
1014 : state.metrics.config_ms = startup_end_time
1015 : .signed_duration_since(config_time)
1016 : .to_std()
1017 : .unwrap()
1018 : .as_millis() as u64;
1019 : state.metrics.total_startup_ms = startup_end_time
1020 : .signed_duration_since(compute_state.start_time)
1021 : .to_std()
1022 : .unwrap()
1023 : .as_millis() as u64;
1024 : }
1025 : self.set_status(ComputeStatus::Running);
1026 :
1027 571 : info!(
1028 571 : "finished configuration of compute for project {}",
1029 571 : pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
1030 571 : );
1031 :
1032 : // Log metrics so that we can search for slow operations in logs
1033 : let metrics = {
1034 : let state = self.state.lock().unwrap();
1035 : state.metrics.clone()
1036 : };
1037 571 : info!(?metrics, "compute start finished");
1038 :
1039 : Ok(pg_process)
1040 : }
1041 :
1042 : /// Update the `last_active` in the shared state, but ensure that it's a more recent one.
1043 11823 : pub fn update_last_active(&self, last_active: Option<DateTime<Utc>>) {
1044 11823 : let mut state = self.state.lock().unwrap();
1045 11823 : // NB: `Some(<DateTime>)` is always greater than `None`.
1046 11823 : if last_active > state.last_active {
1047 317 : state.last_active = last_active;
1048 317 : debug!("set the last compute activity time to: {:?}", last_active);
1049 11506 : }
1050 11823 : }
1051 :
1052 : // Look for core dumps and collect backtraces.
1053 : //
1054 : // EKS worker nodes have following core dump settings:
1055 : // /proc/sys/kernel/core_pattern -> core
1056 : // /proc/sys/kernel/core_uses_pid -> 1
1057 : // ulimint -c -> unlimited
1058 : // which results in core dumps being written to postgres data directory as core.<pid>.
1059 : //
1060 : // Use that as a default location and pattern, except macos where core dumps are written
1061 : // to /cores/ directory by default.
1062 570 : pub fn check_for_core_dumps(&self) -> Result<()> {
1063 570 : let core_dump_dir = match std::env::consts::OS {
1064 570 : "macos" => Path::new("/cores/"),
1065 570 : _ => Path::new(&self.pgdata),
1066 : };
1067 :
1068 : // Collect core dump paths if any
1069 570 : info!("checking for core dumps in {}", core_dump_dir.display());
1070 570 : let files = fs::read_dir(core_dump_dir)?;
1071 13731 : let cores = files.filter_map(|entry| {
1072 13731 : let entry = entry.ok()?;
1073 13731 : let _ = entry.file_name().to_str()?.strip_prefix("core.")?;
1074 0 : Some(entry.path())
1075 13731 : });
1076 :
1077 : // Print backtrace for each core dump
1078 570 : for core_path in cores {
1079 0 : warn!(
1080 0 : "core dump found: {}, collecting backtrace",
1081 0 : core_path.display()
1082 0 : );
1083 :
1084 : // Try first with gdb
1085 0 : let backtrace = Command::new("gdb")
1086 0 : .args(["--batch", "-q", "-ex", "bt", &self.pgbin])
1087 0 : .arg(&core_path)
1088 0 : .output();
1089 :
1090 : // Try lldb if no gdb is found -- that is handy for local testing on macOS
1091 0 : let backtrace = match backtrace {
1092 0 : Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {
1093 0 : warn!("cannot find gdb, trying lldb");
1094 0 : Command::new("lldb")
1095 0 : .arg("-c")
1096 0 : .arg(&core_path)
1097 0 : .args(["--batch", "-o", "bt all", "-o", "quit"])
1098 0 : .output()
1099 : }
1100 0 : _ => backtrace,
1101 0 : }?;
1102 :
1103 0 : warn!(
1104 0 : "core dump backtrace: {}",
1105 0 : String::from_utf8_lossy(&backtrace.stdout)
1106 0 : );
1107 0 : warn!(
1108 0 : "debugger stderr: {}",
1109 0 : String::from_utf8_lossy(&backtrace.stderr)
1110 0 : );
1111 : }
1112 :
1113 570 : Ok(())
1114 570 : }
1115 :
1116 : /// Select `pg_stat_statements` data and return it as a stringified JSON
1117 0 : pub async fn collect_insights(&self) -> String {
1118 0 : let mut result_rows: Vec<String> = Vec::new();
1119 0 : let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await;
1120 0 : let (client, connection) = connect_result.unwrap();
1121 0 : tokio::spawn(async move {
1122 0 : if let Err(e) = connection.await {
1123 0 : eprintln!("connection error: {}", e);
1124 0 : }
1125 0 : });
1126 0 : let result = client
1127 0 : .simple_query(
1128 0 : "SELECT
1129 0 : row_to_json(pg_stat_statements)
1130 0 : FROM
1131 0 : pg_stat_statements
1132 0 : WHERE
1133 0 : userid != 'cloud_admin'::regrole::oid
1134 0 : ORDER BY
1135 0 : (mean_exec_time + mean_plan_time) DESC
1136 0 : LIMIT 100",
1137 0 : )
1138 0 : .await;
1139 :
1140 0 : if let Ok(raw_rows) = result {
1141 0 : for message in raw_rows.iter() {
1142 0 : if let postgres::SimpleQueryMessage::Row(row) = message {
1143 0 : if let Some(json) = row.get(0) {
1144 0 : result_rows.push(json.to_string());
1145 0 : }
1146 0 : }
1147 : }
1148 :
1149 0 : format!("{{\"pg_stat_statements\": [{}]}}", result_rows.join(","))
1150 : } else {
1151 0 : "{{\"pg_stat_statements\": []}}".to_string()
1152 : }
1153 0 : }
1154 :
1155 : // download an archive, unzip and place files in correct locations
1156 2 : pub async fn download_extension(
1157 2 : &self,
1158 2 : real_ext_name: String,
1159 2 : ext_path: RemotePath,
1160 2 : ) -> Result<u64, DownloadError> {
1161 2 : let ext_remote_storage =
1162 2 : self.ext_remote_storage
1163 2 : .as_ref()
1164 2 : .ok_or(DownloadError::BadInput(anyhow::anyhow!(
1165 2 : "Remote extensions storage is not configured",
1166 2 : )))?;
1167 :
1168 2 : let ext_archive_name = ext_path.object_name().expect("bad path");
1169 2 :
1170 2 : let mut first_try = false;
1171 2 : if !self
1172 2 : .ext_download_progress
1173 2 : .read()
1174 2 : .expect("lock err")
1175 2 : .contains_key(ext_archive_name)
1176 1 : {
1177 1 : self.ext_download_progress
1178 1 : .write()
1179 1 : .expect("lock err")
1180 1 : .insert(ext_archive_name.to_string(), (Utc::now(), false));
1181 1 : first_try = true;
1182 1 : }
1183 2 : let (download_start, download_completed) =
1184 2 : self.ext_download_progress.read().expect("lock err")[ext_archive_name];
1185 2 : let start_time_delta = Utc::now()
1186 2 : .signed_duration_since(download_start)
1187 2 : .to_std()
1188 2 : .unwrap()
1189 2 : .as_millis() as u64;
1190 2 :
1191 2 : // how long to wait for extension download if it was started by another process
1192 2 : const HANG_TIMEOUT: u64 = 3000; // milliseconds
1193 2 :
1194 2 : if download_completed {
1195 1 : info!("extension already downloaded, skipping re-download");
1196 1 : return Ok(0);
1197 1 : } else if start_time_delta < HANG_TIMEOUT && !first_try {
1198 0 : info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout");
1199 0 : let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
1200 : loop {
1201 0 : info!("waiting for download");
1202 0 : interval.tick().await;
1203 0 : let (_, download_completed_now) =
1204 0 : self.ext_download_progress.read().expect("lock")[ext_archive_name];
1205 0 : if download_completed_now {
1206 0 : info!("download finished by whoever else downloaded it");
1207 0 : return Ok(0);
1208 0 : }
1209 : }
1210 : // NOTE: the above loop will get terminated
1211 : // based on the timeout of the download function
1212 1 : }
1213 :
1214 : // if extension hasn't been downloaded before or the previous
1215 : // attempt to download was at least HANG_TIMEOUT ms ago
1216 : // then we try to download it here
1217 1 : info!("downloading new extension {ext_archive_name}");
1218 :
1219 1 : let download_size = extension_server::download_extension(
1220 1 : &real_ext_name,
1221 1 : &ext_path,
1222 1 : ext_remote_storage,
1223 1 : &self.pgbin,
1224 1 : )
1225 25 : .await
1226 1 : .map_err(DownloadError::Other);
1227 1 :
1228 1 : self.ext_download_progress
1229 1 : .write()
1230 1 : .expect("bad lock")
1231 1 : .insert(ext_archive_name.to_string(), (download_start, true));
1232 1 :
1233 1 : download_size
1234 2 : }
1235 :
1236 : #[tokio::main]
1237 1 : pub async fn prepare_preload_libraries(
1238 1 : &self,
1239 1 : spec: &ComputeSpec,
1240 1 : ) -> Result<RemoteExtensionMetrics> {
1241 1 : if self.ext_remote_storage.is_none() {
1242 1 : return Ok(RemoteExtensionMetrics {
1243 0 : num_ext_downloaded: 0,
1244 0 : largest_ext_size: 0,
1245 0 : total_ext_download_size: 0,
1246 0 : });
1247 1 : }
1248 1 : let remote_extensions = spec
1249 1 : .remote_extensions
1250 1 : .as_ref()
1251 1 : .ok_or(anyhow::anyhow!("Remote extensions are not configured"))?;
1252 1 :
1253 1 : info!("parse shared_preload_libraries from spec.cluster.settings");
1254 1 : let mut libs_vec = Vec::new();
1255 1 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
1256 0 : libs_vec = libs
1257 0 : .split(&[',', '\'', ' '])
1258 0 : .filter(|s| *s != "neon" && !s.is_empty())
1259 0 : .map(str::to_string)
1260 0 : .collect();
1261 1 : }
1262 1 : info!("parse shared_preload_libraries from provided postgresql.conf");
1263 1 :
1264 1 : // that is used in neon_local and python tests
1265 1 : if let Some(conf) = &spec.cluster.postgresql_conf {
1266 1 : let conf_lines = conf.split('\n').collect::<Vec<&str>>();
1267 1 : let mut shared_preload_libraries_line = "";
1268 22 : for line in conf_lines {
1269 21 : if line.starts_with("shared_preload_libraries") {
1270 1 : shared_preload_libraries_line = line;
1271 20 : }
1272 1 : }
1273 1 : let mut preload_libs_vec = Vec::new();
1274 1 : if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
1275 0 : preload_libs_vec = libs
1276 0 : .split(&[',', '\'', ' '])
1277 0 : .filter(|s| *s != "neon" && !s.is_empty())
1278 0 : .map(str::to_string)
1279 0 : .collect();
1280 1 : }
1281 1 : libs_vec.extend(preload_libs_vec);
1282 1 : }
1283 1 :
1284 1 : // Don't try to download libraries that are not in the index.
1285 1 : // Assume that they are already present locally.
1286 1 : libs_vec.retain(|lib| remote_extensions.library_index.contains_key(lib));
1287 1 :
1288 1 : info!("Downloading to shared preload libraries: {:?}", &libs_vec);
1289 1 :
1290 1 : let mut download_tasks = Vec::new();
1291 1 : for library in &libs_vec {
1292 0 : let (ext_name, ext_path) =
1293 1 : remote_extensions.get_ext(library, true, &self.build_tag, &self.pgversion)?;
1294 1 : download_tasks.push(self.download_extension(ext_name, ext_path));
1295 1 : }
1296 1 : let results = join_all(download_tasks).await;
1297 1 :
1298 1 : let mut remote_ext_metrics = RemoteExtensionMetrics {
1299 1 : num_ext_downloaded: 0,
1300 1 : largest_ext_size: 0,
1301 1 : total_ext_download_size: 0,
1302 1 : };
1303 1 : for result in results {
1304 0 : let download_size = match result {
1305 1 : Ok(res) => {
1306 0 : remote_ext_metrics.num_ext_downloaded += 1;
1307 0 : res
1308 1 : }
1309 1 : Err(err) => {
1310 0 : // if we failed to download an extension, we don't want to fail the whole
1311 0 : // process, but we do want to log the error
1312 0 : error!("Failed to download extension: {}", err);
1313 1 : 0
1314 1 : }
1315 1 : };
1316 1 :
1317 1 : remote_ext_metrics.largest_ext_size =
1318 0 : std::cmp::max(remote_ext_metrics.largest_ext_size, download_size);
1319 0 : remote_ext_metrics.total_ext_download_size += download_size;
1320 1 : }
1321 1 : Ok(remote_ext_metrics)
1322 1 : }
1323 : }
|