Line data Source code
1 : use std::collections::HashMap;
2 : use std::env;
3 : use std::fs;
4 : use std::io::BufRead;
5 : use std::os::unix::fs::PermissionsExt;
6 : use std::path::Path;
7 : use std::process::{Command, Stdio};
8 : use std::str::FromStr;
9 : use std::sync::{Condvar, Mutex, RwLock};
10 : use std::time::Instant;
11 :
12 : use anyhow::{Context, Result};
13 : use chrono::{DateTime, Utc};
14 : use futures::future::join_all;
15 : use futures::stream::FuturesUnordered;
16 : use futures::StreamExt;
17 : use postgres::{Client, NoTls};
18 : use tokio;
19 : use tokio_postgres;
20 : use tracing::{error, info, instrument, warn};
21 : use utils::id::{TenantId, TimelineId};
22 : use utils::lsn::Lsn;
23 :
24 : use compute_api::responses::{ComputeMetrics, ComputeStatus};
25 : use compute_api::spec::{ComputeMode, ComputeSpec};
26 : use utils::measured_stream::MeasuredReader;
27 :
28 : use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
29 :
30 : use crate::checker::create_availability_check_data;
31 : use crate::pg_helpers::*;
32 : use crate::spec::*;
33 : use crate::sync_sk::{check_if_synced, ping_safekeeper};
34 : use crate::{config, extension_server};
35 :
36 : /// Compute node info shared across several `compute_ctl` threads.
37 : pub struct ComputeNode {
38 : // Url type maintains proper escaping
39 : pub connstr: url::Url,
40 : pub pgdata: String,
41 : pub pgbin: String,
42 : pub pgversion: String,
43 : /// We should only allow live re- / configuration of the compute node if
44 : /// it uses 'pull model', i.e. it can go to control-plane and fetch
45 : /// the latest configuration. Otherwise, there could be a case:
46 : /// - we start compute with some spec provided as argument
47 : /// - we push new spec and it does reconfiguration
48 : /// - but then something happens and compute pod / VM is destroyed,
49 : /// so k8s controller starts it again with the **old** spec
50 : /// and the same for empty computes:
51 : /// - we started compute without any spec
52 : /// - we push spec and it does configuration
53 : /// - but then it is restarted without any spec again
54 : pub live_config_allowed: bool,
55 : /// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
56 : /// To allow HTTP API server to serving status requests, while configuration
57 : /// is in progress, lock should be held only for short periods of time to do
58 : /// read/write, not the whole configuration process.
59 : pub state: Mutex<ComputeState>,
60 : /// `Condvar` to allow notifying waiters about state changes.
61 : pub state_changed: Condvar,
62 : /// the S3 bucket that we search for extensions in
63 : pub ext_remote_storage: Option<GenericRemoteStorage>,
64 : // key: ext_archive_name, value: started download time, download_completed?
65 : pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
66 : pub build_tag: String,
67 : }
68 :
69 : // store some metrics about download size that might impact startup time
70 0 : #[derive(Clone, Debug)]
71 : pub struct RemoteExtensionMetrics {
72 : num_ext_downloaded: u64,
73 : largest_ext_size: u64,
74 : total_ext_download_size: u64,
75 : }
76 :
77 1326 : #[derive(Clone, Debug)]
78 : pub struct ComputeState {
79 : pub start_time: DateTime<Utc>,
80 : pub status: ComputeStatus,
81 : /// Timestamp of the last Postgres activity. It could be `None` if
82 : /// compute wasn't used since start.
83 : pub last_active: Option<DateTime<Utc>>,
84 : pub error: Option<String>,
85 : pub pspec: Option<ParsedSpec>,
86 : pub metrics: ComputeMetrics,
87 : }
88 :
89 : impl ComputeState {
90 663 : pub fn new() -> Self {
91 663 : Self {
92 663 : start_time: Utc::now(),
93 663 : status: ComputeStatus::Empty,
94 663 : last_active: None,
95 663 : error: None,
96 663 : pspec: None,
97 663 : metrics: ComputeMetrics::default(),
98 663 : }
99 663 : }
100 : }
101 :
102 : impl Default for ComputeState {
103 0 : fn default() -> Self {
104 0 : Self::new()
105 0 : }
106 : }
107 :
108 1900 : #[derive(Clone, Debug)]
109 : pub struct ParsedSpec {
110 : pub spec: ComputeSpec,
111 : pub tenant_id: TenantId,
112 : pub timeline_id: TimelineId,
113 : pub pageserver_connstr: String,
114 : pub safekeeper_connstrings: Vec<String>,
115 : pub storage_auth_token: Option<String>,
116 : }
117 :
118 : impl TryFrom<ComputeSpec> for ParsedSpec {
119 : type Error = String;
120 663 : fn try_from(spec: ComputeSpec) -> Result<Self, String> {
121 : // Extract the options from the spec file that are needed to connect to
122 : // the storage system.
123 : //
124 : // For backwards-compatibility, the top-level fields in the spec file
125 : // may be empty. In that case, we need to dig them from the GUCs in the
126 : // cluster.settings field.
127 663 : let pageserver_connstr = spec
128 663 : .pageserver_connstring
129 663 : .clone()
130 663 : .or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
131 663 : .ok_or("pageserver connstr should be provided")?;
132 663 : let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
133 89 : if matches!(spec.mode, ComputeMode::Primary) {
134 0 : spec.cluster
135 0 : .settings
136 0 : .find("neon.safekeepers")
137 0 : .ok_or("safekeeper connstrings should be provided")?
138 0 : .split(',')
139 0 : .map(|str| str.to_string())
140 0 : .collect()
141 : } else {
142 89 : vec![]
143 : }
144 : } else {
145 574 : spec.safekeeper_connstrings.clone()
146 : };
147 663 : let storage_auth_token = spec.storage_auth_token.clone();
148 663 : let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
149 663 : tenant_id
150 : } else {
151 0 : spec.cluster
152 0 : .settings
153 0 : .find("neon.tenant_id")
154 0 : .ok_or("tenant id should be provided")
155 0 : .map(|s| TenantId::from_str(&s))?
156 0 : .or(Err("invalid tenant id"))?
157 : };
158 663 : let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
159 663 : timeline_id
160 : } else {
161 0 : spec.cluster
162 0 : .settings
163 0 : .find("neon.timeline_id")
164 0 : .ok_or("timeline id should be provided")
165 0 : .map(|s| TimelineId::from_str(&s))?
166 0 : .or(Err("invalid timeline id"))?
167 : };
168 :
169 663 : Ok(ParsedSpec {
170 663 : spec,
171 663 : pageserver_connstr,
172 663 : safekeeper_connstrings,
173 663 : storage_auth_token,
174 663 : tenant_id,
175 663 : timeline_id,
176 663 : })
177 663 : }
178 : }
179 :
180 : /// If we are a VM, returns a [`Command`] that will run in the `neon-postgres`
181 : /// cgroup. Otherwise returns the default `Command::new(cmd)`
182 : ///
183 : /// This function should be used to start postgres, as it will start it in the
184 : /// neon-postgres cgroup if we are a VM. This allows autoscaling to control
185 : /// postgres' resource usage. The cgroup will exist in VMs because vm-builder
186 : /// creates it during the sysinit phase of its inittab.
187 1671 : fn maybe_cgexec(cmd: &str) -> Command {
188 1671 : // The cplane sets this env var for autoscaling computes.
189 1671 : // use `var_os` so we don't have to worry about the variable being valid
190 1671 : // unicode. Should never be an concern . . . but just in case
191 1671 : if env::var_os("AUTOSCALING").is_some() {
192 0 : let mut command = Command::new("cgexec");
193 0 : command.args(["-g", "memory:neon-postgres"]);
194 0 : command.arg(cmd);
195 0 : command
196 : } else {
197 1671 : Command::new(cmd)
198 : }
199 1671 : }
200 :
201 : /// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
202 : /// that we give to customers
203 0 : fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
204 0 : let roles = spec
205 0 : .cluster
206 0 : .roles
207 0 : .iter()
208 0 : .map(|r| escape_literal(&r.name))
209 0 : .collect::<Vec<_>>();
210 0 :
211 0 : let dbs = spec
212 0 : .cluster
213 0 : .databases
214 0 : .iter()
215 0 : .map(|db| escape_literal(&db.name))
216 0 : .collect::<Vec<_>>();
217 :
218 0 : let roles_decl = if roles.is_empty() {
219 0 : String::from("roles text[] := NULL;")
220 : } else {
221 0 : format!(
222 0 : r#"
223 0 : roles text[] := ARRAY(SELECT rolname
224 0 : FROM pg_catalog.pg_roles
225 0 : WHERE rolname IN ({}));"#,
226 0 : roles.join(", ")
227 0 : )
228 : };
229 :
230 0 : let database_decl = if dbs.is_empty() {
231 0 : String::from("dbs text[] := NULL;")
232 : } else {
233 0 : format!(
234 0 : r#"
235 0 : dbs text[] := ARRAY(SELECT datname
236 0 : FROM pg_catalog.pg_database
237 0 : WHERE datname IN ({}));"#,
238 0 : dbs.join(", ")
239 0 : )
240 : };
241 :
242 : // ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases
243 : // (see https://www.postgresql.org/docs/current/ddl-priv.html)
244 0 : let query = format!(
245 0 : r#"
246 0 : DO $$
247 0 : DECLARE
248 0 : r text;
249 0 : {}
250 0 : {}
251 0 : BEGIN
252 0 : IF NOT EXISTS (
253 0 : SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
254 0 : THEN
255 0 : CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN IN ROLE pg_read_all_data, pg_write_all_data;
256 0 : IF array_length(roles, 1) IS NOT NULL THEN
257 0 : EXECUTE format('GRANT neon_superuser TO %s',
258 0 : array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
259 0 : FOREACH r IN ARRAY roles LOOP
260 0 : EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r));
261 0 : END LOOP;
262 0 : END IF;
263 0 : IF array_length(dbs, 1) IS NOT NULL THEN
264 0 : EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser',
265 0 : array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', '));
266 0 : END IF;
267 0 : END IF;
268 0 : END
269 0 : $$;"#,
270 0 : roles_decl, database_decl,
271 0 : );
272 0 : info!("Neon superuser created:\n{}", &query);
273 0 : client
274 0 : .simple_query(&query)
275 0 : .map_err(|e| anyhow::anyhow!(e).context(query))?;
276 0 : Ok(())
277 0 : }
278 :
279 : impl ComputeNode {
280 654 : pub fn set_status(&self, status: ComputeStatus) {
281 654 : let mut state = self.state.lock().unwrap();
282 654 : state.status = status;
283 654 : self.state_changed.notify_all();
284 654 : }
285 :
286 0 : pub fn get_status(&self) -> ComputeStatus {
287 0 : self.state.lock().unwrap().status
288 0 : }
289 :
290 : // Remove `pgdata` directory and create it again with right permissions.
291 663 : fn create_pgdata(&self) -> Result<()> {
292 663 : // Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
293 663 : // If it is something different then create_dir() will error out anyway.
294 663 : let _ok = fs::remove_dir_all(&self.pgdata);
295 663 : fs::create_dir(&self.pgdata)?;
296 663 : fs::set_permissions(&self.pgdata, fs::Permissions::from_mode(0o700))?;
297 :
298 663 : Ok(())
299 663 : }
300 :
301 : // Get basebackup from the libpq connection to pageserver using `connstr` and
302 : // unarchive it to `pgdata` directory overriding all its previous content.
303 663 : #[instrument(skip_all, fields(%lsn))]
304 : fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
305 : let spec = compute_state.pspec.as_ref().expect("spec must be set");
306 : let start_time = Instant::now();
307 :
308 : let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
309 :
310 : // Use the storage auth token from the config file, if given.
311 : // Note: this overrides any password set in the connection string.
312 : if let Some(storage_auth_token) = &spec.storage_auth_token {
313 15 : info!("Got storage auth token from spec file");
314 : config.password(storage_auth_token);
315 : } else {
316 648 : info!("Storage auth token not set");
317 : }
318 :
319 : // Connect to pageserver
320 : let mut client = config.connect(NoTls)?;
321 : let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
322 :
323 : let basebackup_cmd = match lsn {
324 : // HACK We don't use compression on first start (Lsn(0)) because there's no API for it
325 : Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id),
326 : _ => format!(
327 : "basebackup {} {} {} --gzip",
328 : spec.tenant_id, spec.timeline_id, lsn
329 : ),
330 : };
331 :
332 : let copyreader = client.copy_out(basebackup_cmd.as_str())?;
333 : let mut measured_reader = MeasuredReader::new(copyreader);
334 :
335 : // Check the magic number to see if it's a gzip or not. Even though
336 : // we might explicitly ask for gzip, an old pageserver with no implementation
337 : // of gzip compression might send us uncompressed data. After some time
338 : // passes we can assume all pageservers know how to compress and we can
339 : // delete this check.
340 : //
341 : // If the data is not gzip, it will be tar. It will not be mistakenly
342 : // recognized as gzip because tar starts with an ascii encoding of a filename,
343 : // and 0x1f and 0x8b are unlikely first characters for any filename. Moreover,
344 : // we send the "global" directory first from the pageserver, so it definitely
345 : // won't be recognized as gzip.
346 : let mut bufreader = std::io::BufReader::new(&mut measured_reader);
347 : let gzip = {
348 : let peek = bufreader.fill_buf().unwrap();
349 : peek[0] == 0x1f && peek[1] == 0x8b
350 : };
351 :
352 : // Read the archive directly from the `CopyOutReader`
353 : //
354 : // Set `ignore_zeros` so that unpack() reads all the Copy data and
355 : // doesn't stop at the end-of-archive marker. Otherwise, if the server
356 : // sends an Error after finishing the tarball, we will not notice it.
357 : if gzip {
358 : let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut bufreader));
359 : ar.set_ignore_zeros(true);
360 : ar.unpack(&self.pgdata)?;
361 : } else {
362 : let mut ar = tar::Archive::new(&mut bufreader);
363 : ar.set_ignore_zeros(true);
364 : ar.unpack(&self.pgdata)?;
365 : };
366 :
367 : // Report metrics
368 : let mut state = self.state.lock().unwrap();
369 : state.metrics.pageserver_connect_micros = pageserver_connect_micros;
370 : state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
371 : state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
372 : Ok(())
373 : }
374 :
375 574 : pub async fn check_safekeepers_synced_async(
376 574 : &self,
377 574 : compute_state: &ComputeState,
378 574 : ) -> Result<Option<Lsn>> {
379 574 : // Construct a connection config for each safekeeper
380 574 : let pspec: ParsedSpec = compute_state
381 574 : .pspec
382 574 : .as_ref()
383 574 : .expect("spec must be set")
384 574 : .clone();
385 574 : let sk_connstrs: Vec<String> = pspec.safekeeper_connstrings.clone();
386 777 : let sk_configs = sk_connstrs.into_iter().map(|connstr| {
387 777 : // Format connstr
388 777 : let id = connstr.clone();
389 777 : let connstr = format!("postgresql://no_user@{}", connstr);
390 777 : let options = format!(
391 777 : "-c timeline_id={} tenant_id={}",
392 777 : pspec.timeline_id, pspec.tenant_id
393 777 : );
394 777 :
395 777 : // Construct client
396 777 : let mut config = tokio_postgres::Config::from_str(&connstr).unwrap();
397 777 : config.options(&options);
398 777 : if let Some(storage_auth_token) = pspec.storage_auth_token.clone() {
399 26 : config.password(storage_auth_token);
400 751 : }
401 :
402 777 : (id, config)
403 777 : });
404 574 :
405 574 : // Create task set to query all safekeepers
406 574 : let mut tasks = FuturesUnordered::new();
407 574 : let quorum = sk_configs.len() / 2 + 1;
408 1351 : for (id, config) in sk_configs {
409 777 : let timeout = tokio::time::Duration::from_millis(100);
410 777 : let task = tokio::time::timeout(timeout, ping_safekeeper(id, config));
411 777 : tasks.push(tokio::spawn(task));
412 777 : }
413 :
414 : // Get a quorum of responses or errors
415 574 : let mut responses = Vec::new();
416 574 : let mut join_errors = Vec::new();
417 574 : let mut task_errors = Vec::new();
418 574 : let mut timeout_errors = Vec::new();
419 1217 : while let Some(response) = tasks.next().await {
420 681 : match response {
421 677 : Ok(Ok(Ok(r))) => responses.push(r),
422 4 : Ok(Ok(Err(e))) => task_errors.push(e),
423 0 : Ok(Err(e)) => timeout_errors.push(e),
424 0 : Err(e) => join_errors.push(e),
425 : };
426 681 : if responses.len() >= quorum {
427 574 : break;
428 107 : }
429 107 : if join_errors.len() + task_errors.len() + timeout_errors.len() >= quorum {
430 0 : break;
431 107 : }
432 : }
433 :
434 : // In case of error, log and fail the check, but don't crash.
435 : // We're playing it safe because these errors could be transient
436 : // and we don't yet retry. Also being careful here allows us to
437 : // be backwards compatible with safekeepers that don't have the
438 : // TIMELINE_STATUS API yet.
439 574 : if responses.len() < quorum {
440 0 : error!(
441 0 : "failed sync safekeepers check {:?} {:?} {:?}",
442 0 : join_errors, task_errors, timeout_errors
443 0 : );
444 0 : return Ok(None);
445 574 : }
446 574 :
447 574 : Ok(check_if_synced(responses))
448 574 : }
449 :
450 : // Fast path for sync_safekeepers. If they're already synced we get the lsn
451 : // in one roundtrip. If not, we should do a full sync_safekeepers.
452 574 : pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
453 574 : let start_time = Utc::now();
454 574 :
455 574 : // Run actual work with new tokio runtime
456 574 : let rt = tokio::runtime::Builder::new_current_thread()
457 574 : .enable_all()
458 574 : .build()
459 574 : .expect("failed to create rt");
460 574 : let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
461 574 :
462 574 : // Record runtime
463 574 : self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
464 574 : .signed_duration_since(start_time)
465 574 : .to_std()
466 574 : .unwrap()
467 574 : .as_millis() as u64;
468 574 : result
469 574 : }
470 :
471 : // Run `postgres` in a special mode with `--sync-safekeepers` argument
472 : // and return the reported LSN back to the caller.
473 1017 : #[instrument(skip_all)]
474 : pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
475 : let start_time = Utc::now();
476 :
477 : let sync_handle = maybe_cgexec(&self.pgbin)
478 : .args(["--sync-safekeepers"])
479 : .env("PGDATA", &self.pgdata) // we cannot use -D in this mode
480 : .envs(if let Some(storage_auth_token) = &storage_auth_token {
481 : vec![("NEON_AUTH_TOKEN", storage_auth_token)]
482 : } else {
483 : vec![]
484 : })
485 : .stdout(Stdio::piped())
486 : .spawn()
487 : .expect("postgres --sync-safekeepers failed to start");
488 :
489 : // `postgres --sync-safekeepers` will print all log output to stderr and
490 : // final LSN to stdout. So we pipe only stdout, while stderr will be automatically
491 : // redirected to the caller output.
492 : let sync_output = sync_handle
493 : .wait_with_output()
494 : .expect("postgres --sync-safekeepers failed");
495 :
496 : if !sync_output.status.success() {
497 : anyhow::bail!(
498 : "postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
499 : sync_output.status,
500 : String::from_utf8(sync_output.stdout)
501 : .expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
502 : );
503 : }
504 :
505 : self.state.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now()
506 : .signed_duration_since(start_time)
507 : .to_std()
508 : .unwrap()
509 : .as_millis() as u64;
510 :
511 : let lsn = Lsn::from_str(String::from_utf8(sync_output.stdout)?.trim())?;
512 :
513 : Ok(lsn)
514 : }
515 :
516 : /// Do all the preparations like PGDATA directory creation, configuration,
517 : /// safekeepers sync, basebackup, etc.
518 663 : #[instrument(skip_all)]
519 : pub fn prepare_pgdata(
520 : &self,
521 : compute_state: &ComputeState,
522 : extension_server_port: u16,
523 : ) -> Result<()> {
524 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
525 : let spec = &pspec.spec;
526 : let pgdata_path = Path::new(&self.pgdata);
527 :
528 : // Remove/create an empty pgdata directory and put configuration there.
529 : self.create_pgdata()?;
530 : config::write_postgres_conf(
531 : &pgdata_path.join("postgresql.conf"),
532 : &pspec.spec,
533 : Some(extension_server_port),
534 : )?;
535 :
536 : // Syncing safekeepers is only safe with primary nodes: if a primary
537 : // is already connected it will be kicked out, so a secondary (standby)
538 : // cannot sync safekeepers.
539 : let lsn = match spec.mode {
540 : ComputeMode::Primary => {
541 574 : info!("checking if safekeepers are synced");
542 : let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
543 : lsn
544 : } else {
545 443 : info!("starting safekeepers syncing");
546 : self.sync_safekeepers(pspec.storage_auth_token.clone())
547 0 : .with_context(|| "failed to sync safekeepers")?
548 : };
549 574 : info!("safekeepers synced at LSN {}", lsn);
550 : lsn
551 : }
552 : ComputeMode::Static(lsn) => {
553 88 : info!("Starting read-only node at static LSN {}", lsn);
554 : lsn
555 : }
556 : ComputeMode::Replica => {
557 1 : info!("Initializing standby from latest Pageserver LSN");
558 : Lsn(0)
559 : }
560 : };
561 :
562 663 : info!(
563 663 : "getting basebackup@{} from pageserver {}",
564 663 : lsn, &pspec.pageserver_connstr
565 663 : );
566 9 : self.get_basebackup(compute_state, lsn).with_context(|| {
567 9 : format!(
568 9 : "failed to get basebackup@{} from pageserver {}",
569 9 : lsn, &pspec.pageserver_connstr
570 9 : )
571 9 : })?;
572 :
573 : // Update pg_hba.conf received with basebackup.
574 : update_pg_hba(pgdata_path)?;
575 :
576 : match spec.mode {
577 : ComputeMode::Primary => {}
578 : ComputeMode::Replica | ComputeMode::Static(..) => {
579 : add_standby_signal(pgdata_path)?;
580 : }
581 : }
582 :
583 : Ok(())
584 : }
585 :
586 : /// Start and stop a postgres process to warm up the VM for startup.
587 0 : pub fn prewarm_postgres(&self) -> Result<()> {
588 0 : info!("prewarming");
589 :
590 : // Create pgdata
591 0 : let pgdata = &format!("{}.warmup", self.pgdata);
592 0 : create_pgdata(pgdata)?;
593 :
594 : // Run initdb to completion
595 0 : info!("running initdb");
596 0 : let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
597 0 : Command::new(initdb_bin)
598 0 : .args(["-D", pgdata])
599 0 : .output()
600 0 : .expect("cannot start initdb process");
601 0 :
602 0 : // Write conf
603 0 : use std::io::Write;
604 0 : let conf_path = Path::new(pgdata).join("postgresql.conf");
605 0 : let mut file = std::fs::File::create(conf_path)?;
606 0 : writeln!(file, "shared_buffers=65536")?;
607 0 : writeln!(file, "port=51055")?; // Nobody should be connecting
608 0 : writeln!(file, "shared_preload_libraries = 'neon'")?;
609 :
610 : // Start postgres
611 0 : info!("starting postgres");
612 0 : let mut pg = maybe_cgexec(&self.pgbin)
613 0 : .args(["-D", pgdata])
614 0 : .spawn()
615 0 : .expect("cannot start postgres process");
616 0 :
617 0 : // Stop it when it's ready
618 0 : info!("waiting for postgres");
619 0 : wait_for_postgres(&mut pg, Path::new(pgdata))?;
620 0 : pg.kill()?;
621 0 : info!("sent kill signal");
622 0 : pg.wait()?;
623 0 : info!("done prewarming");
624 :
625 : // clean up
626 0 : let _ok = fs::remove_dir_all(pgdata);
627 0 : Ok(())
628 0 : }
629 :
630 : /// Start Postgres as a child process and manage DBs/roles.
631 : /// After that this will hang waiting on the postmaster process to exit.
632 654 : #[instrument(skip_all)]
633 : pub fn start_postgres(
634 : &self,
635 : storage_auth_token: Option<String>,
636 : ) -> Result<std::process::Child> {
637 : let pgdata_path = Path::new(&self.pgdata);
638 :
639 : // Run postgres as a child process.
640 : let mut pg = maybe_cgexec(&self.pgbin)
641 : .args(["-D", &self.pgdata])
642 : .envs(if let Some(storage_auth_token) = &storage_auth_token {
643 : vec![("NEON_AUTH_TOKEN", storage_auth_token)]
644 : } else {
645 : vec![]
646 : })
647 : .spawn()
648 : .expect("cannot start postgres process");
649 :
650 : wait_for_postgres(&mut pg, pgdata_path)?;
651 :
652 : Ok(pg)
653 : }
654 :
655 : /// Do initial configuration of the already started Postgres.
656 0 : #[instrument(skip_all)]
657 : pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
658 : // If connection fails,
659 : // it may be the old node with `zenith_admin` superuser.
660 : //
661 : // In this case we need to connect with old `zenith_admin` name
662 : // and create new user. We cannot simply rename connected user,
663 : // but we can create a new one and grant it all privileges.
664 : let mut client = match Client::connect(self.connstr.as_str(), NoTls) {
665 : Err(e) => {
666 0 : info!(
667 0 : "cannot connect to postgres: {}, retrying with `zenith_admin` username",
668 0 : e
669 0 : );
670 : let mut zenith_admin_connstr = self.connstr.clone();
671 :
672 : zenith_admin_connstr
673 : .set_username("zenith_admin")
674 0 : .map_err(|_| anyhow::anyhow!("invalid connstr"))?;
675 :
676 : let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
677 : // Disable forwarding so that users don't get a cloud_admin role
678 : client.simple_query("SET neon.forward_ddl = false")?;
679 : client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
680 : client.simple_query("GRANT zenith_admin TO cloud_admin")?;
681 : drop(client);
682 :
683 : // reconnect with connsting with expected name
684 : Client::connect(self.connstr.as_str(), NoTls)?
685 : }
686 : Ok(client) => client,
687 : };
688 :
689 : // Disable DDL forwarding because control plane already knows about these roles/databases.
690 : client.simple_query("SET neon.forward_ddl = false")?;
691 :
692 : // Proceed with post-startup configuration. Note, that order of operations is important.
693 : let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
694 : create_neon_superuser(spec, &mut client)?;
695 : handle_roles(spec, &mut client)?;
696 : handle_databases(spec, &mut client)?;
697 : handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
698 : handle_grants(spec, self.connstr.as_str())?;
699 : handle_extensions(spec, &mut client)?;
700 : create_availability_check_data(&mut client)?;
701 :
702 : // 'Close' connection
703 : drop(client);
704 :
705 : Ok(())
706 : }
707 :
708 : // We could've wrapped this around `pg_ctl reload`, but right now we don't use
709 : // `pg_ctl` for start / stop, so this just seems much easier to do as we already
710 : // have opened connection to Postgres and superuser access.
711 0 : #[instrument(skip_all)]
712 : fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
713 : client.simple_query("SELECT pg_reload_conf()")?;
714 : Ok(())
715 : }
716 :
717 : /// Similar to `apply_config()`, but does a bit different sequence of operations,
718 : /// as it's used to reconfigure a previously started and configured Postgres node.
719 0 : #[instrument(skip_all)]
720 : pub fn reconfigure(&self) -> Result<()> {
721 : let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
722 :
723 : // Write new config
724 : let pgdata_path = Path::new(&self.pgdata);
725 : config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
726 :
727 : let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
728 : self.pg_reload_conf(&mut client)?;
729 :
730 : // Proceed with post-startup configuration. Note, that order of operations is important.
731 : // Disable DDL forwarding because control plane already knows about these roles/databases.
732 : if spec.mode == ComputeMode::Primary {
733 : client.simple_query("SET neon.forward_ddl = false")?;
734 : handle_roles(&spec, &mut client)?;
735 : handle_databases(&spec, &mut client)?;
736 : handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
737 : handle_grants(&spec, self.connstr.as_str())?;
738 : handle_extensions(&spec, &mut client)?;
739 : }
740 :
741 : // 'Close' connection
742 : drop(client);
743 :
744 : let unknown_op = "unknown".to_string();
745 : let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
746 0 : info!(
747 0 : "finished reconfiguration of compute node for operation {}",
748 0 : op_id
749 0 : );
750 :
751 : Ok(())
752 : }
753 :
754 663 : #[instrument(skip_all)]
755 : pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
756 : let compute_state = self.state.lock().unwrap().clone();
757 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
758 663 : info!(
759 663 : "starting compute for project {}, operation {}, tenant {}, timeline {}",
760 663 : pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
761 663 : pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
762 663 : pspec.tenant_id,
763 663 : pspec.timeline_id,
764 663 : );
765 :
766 663 : info!(
767 663 : "start_compute spec.remote_extensions {:?}",
768 663 : pspec.spec.remote_extensions
769 663 : );
770 :
771 : // This part is sync, because we need to download
772 : // remote shared_preload_libraries before postgres start (if any)
773 : if let Some(remote_extensions) = &pspec.spec.remote_extensions {
774 : // First, create control files for all availale extensions
775 : extension_server::create_control_files(remote_extensions, &self.pgbin);
776 :
777 : let library_load_start_time = Utc::now();
778 : let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
779 :
780 : let library_load_time = Utc::now()
781 : .signed_duration_since(library_load_start_time)
782 : .to_std()
783 : .unwrap()
784 : .as_millis() as u64;
785 : let mut state = self.state.lock().unwrap();
786 : state.metrics.load_ext_ms = library_load_time;
787 : state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded;
788 : state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size;
789 : state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size;
790 0 : info!(
791 0 : "Loading shared_preload_libraries took {:?}ms",
792 0 : library_load_time
793 0 : );
794 0 : info!("{:?}", remote_ext_metrics);
795 : }
796 :
797 : self.prepare_pgdata(&compute_state, extension_server_port)?;
798 :
799 : let start_time = Utc::now();
800 : let pg = self.start_postgres(pspec.storage_auth_token.clone())?;
801 :
802 : let config_time = Utc::now();
803 : if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
804 : self.apply_config(&compute_state)?;
805 : }
806 :
807 : let startup_end_time = Utc::now();
808 : {
809 : let mut state = self.state.lock().unwrap();
810 : state.metrics.start_postgres_ms = config_time
811 : .signed_duration_since(start_time)
812 : .to_std()
813 : .unwrap()
814 : .as_millis() as u64;
815 : state.metrics.config_ms = startup_end_time
816 : .signed_duration_since(config_time)
817 : .to_std()
818 : .unwrap()
819 : .as_millis() as u64;
820 : state.metrics.total_startup_ms = startup_end_time
821 : .signed_duration_since(compute_state.start_time)
822 : .to_std()
823 : .unwrap()
824 : .as_millis() as u64;
825 : }
826 : self.set_status(ComputeStatus::Running);
827 :
828 654 : info!(
829 654 : "finished configuration of compute for project {}",
830 654 : pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
831 654 : );
832 :
833 : // Log metrics so that we can search for slow operations in logs
834 : let metrics = {
835 : let state = self.state.lock().unwrap();
836 : state.metrics.clone()
837 : };
838 654 : info!(?metrics, "compute start finished");
839 :
840 : Ok(pg)
841 : }
842 :
843 : // Look for core dumps and collect backtraces.
844 : //
845 : // EKS worker nodes have following core dump settings:
846 : // /proc/sys/kernel/core_pattern -> core
847 : // /proc/sys/kernel/core_uses_pid -> 1
848 : // ulimint -c -> unlimited
849 : // which results in core dumps being written to postgres data directory as core.<pid>.
850 : //
851 : // Use that as a default location and pattern, except macos where core dumps are written
852 : // to /cores/ directory by default.
853 663 : pub fn check_for_core_dumps(&self) -> Result<()> {
854 663 : let core_dump_dir = match std::env::consts::OS {
855 663 : "macos" => Path::new("/cores/"),
856 663 : _ => Path::new(&self.pgdata),
857 : };
858 :
859 : // Collect core dump paths if any
860 663 : info!("checking for core dumps in {}", core_dump_dir.display());
861 663 : let files = fs::read_dir(core_dump_dir)?;
862 15828 : let cores = files.filter_map(|entry| {
863 15828 : let entry = entry.ok()?;
864 15828 : let _ = entry.file_name().to_str()?.strip_prefix("core.")?;
865 0 : Some(entry.path())
866 15828 : });
867 :
868 : // Print backtrace for each core dump
869 663 : for core_path in cores {
870 0 : warn!(
871 0 : "core dump found: {}, collecting backtrace",
872 0 : core_path.display()
873 0 : );
874 :
875 : // Try first with gdb
876 0 : let backtrace = Command::new("gdb")
877 0 : .args(["--batch", "-q", "-ex", "bt", &self.pgbin])
878 0 : .arg(&core_path)
879 0 : .output();
880 :
881 : // Try lldb if no gdb is found -- that is handy for local testing on macOS
882 0 : let backtrace = match backtrace {
883 0 : Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {
884 0 : warn!("cannot find gdb, trying lldb");
885 0 : Command::new("lldb")
886 0 : .arg("-c")
887 0 : .arg(&core_path)
888 0 : .args(["--batch", "-o", "bt all", "-o", "quit"])
889 0 : .output()
890 : }
891 0 : _ => backtrace,
892 0 : }?;
893 :
894 0 : warn!(
895 0 : "core dump backtrace: {}",
896 0 : String::from_utf8_lossy(&backtrace.stdout)
897 0 : );
898 0 : warn!(
899 0 : "debugger stderr: {}",
900 0 : String::from_utf8_lossy(&backtrace.stderr)
901 0 : );
902 : }
903 :
904 663 : Ok(())
905 663 : }
906 :
907 : /// Select `pg_stat_statements` data and return it as a stringified JSON
908 0 : pub async fn collect_insights(&self) -> String {
909 0 : let mut result_rows: Vec<String> = Vec::new();
910 0 : let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await;
911 0 : let (client, connection) = connect_result.unwrap();
912 0 : tokio::spawn(async move {
913 0 : if let Err(e) = connection.await {
914 0 : eprintln!("connection error: {}", e);
915 0 : }
916 0 : });
917 0 : let result = client
918 0 : .simple_query(
919 0 : "SELECT
920 0 : row_to_json(pg_stat_statements)
921 0 : FROM
922 0 : pg_stat_statements
923 0 : WHERE
924 0 : userid != 'cloud_admin'::regrole::oid
925 0 : ORDER BY
926 0 : (mean_exec_time + mean_plan_time) DESC
927 0 : LIMIT 100",
928 0 : )
929 0 : .await;
930 :
931 0 : if let Ok(raw_rows) = result {
932 0 : for message in raw_rows.iter() {
933 0 : if let postgres::SimpleQueryMessage::Row(row) = message {
934 0 : if let Some(json) = row.get(0) {
935 0 : result_rows.push(json.to_string());
936 0 : }
937 0 : }
938 : }
939 :
940 0 : format!("{{\"pg_stat_statements\": [{}]}}", result_rows.join(","))
941 : } else {
942 0 : "{{\"pg_stat_statements\": []}}".to_string()
943 : }
944 0 : }
945 :
946 : // download an archive, unzip and place files in correct locations
947 0 : pub async fn download_extension(
948 0 : &self,
949 0 : real_ext_name: String,
950 0 : ext_path: RemotePath,
951 0 : ) -> Result<u64, DownloadError> {
952 0 : let remote_storage = self
953 0 : .ext_remote_storage
954 0 : .as_ref()
955 0 : .ok_or(DownloadError::BadInput(anyhow::anyhow!(
956 0 : "Remote extensions storage is not configured",
957 0 : )))?;
958 :
959 0 : let ext_archive_name = ext_path.object_name().expect("bad path");
960 0 :
961 0 : let mut first_try = false;
962 0 : if !self
963 0 : .ext_download_progress
964 0 : .read()
965 0 : .expect("lock err")
966 0 : .contains_key(ext_archive_name)
967 0 : {
968 0 : self.ext_download_progress
969 0 : .write()
970 0 : .expect("lock err")
971 0 : .insert(ext_archive_name.to_string(), (Utc::now(), false));
972 0 : first_try = true;
973 0 : }
974 0 : let (download_start, download_completed) =
975 0 : self.ext_download_progress.read().expect("lock err")[ext_archive_name];
976 0 : let start_time_delta = Utc::now()
977 0 : .signed_duration_since(download_start)
978 0 : .to_std()
979 0 : .unwrap()
980 0 : .as_millis() as u64;
981 0 :
982 0 : // how long to wait for extension download if it was started by another process
983 0 : const HANG_TIMEOUT: u64 = 3000; // milliseconds
984 0 :
985 0 : if download_completed {
986 0 : info!("extension already downloaded, skipping re-download");
987 0 : return Ok(0);
988 0 : } else if start_time_delta < HANG_TIMEOUT && !first_try {
989 0 : info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout");
990 0 : let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
991 : loop {
992 0 : info!("waiting for download");
993 0 : interval.tick().await;
994 0 : let (_, download_completed_now) =
995 0 : self.ext_download_progress.read().expect("lock")[ext_archive_name];
996 0 : if download_completed_now {
997 0 : info!("download finished by whoever else downloaded it");
998 0 : return Ok(0);
999 0 : }
1000 : }
1001 : // NOTE: the above loop will get terminated
1002 : // based on the timeout of the download function
1003 0 : }
1004 :
1005 : // if extension hasn't been downloaded before or the previous
1006 : // attempt to download was at least HANG_TIMEOUT ms ago
1007 : // then we try to download it here
1008 0 : info!("downloading new extension {ext_archive_name}");
1009 :
1010 0 : let download_size = extension_server::download_extension(
1011 0 : &real_ext_name,
1012 0 : &ext_path,
1013 0 : remote_storage,
1014 0 : &self.pgbin,
1015 0 : )
1016 0 : .await
1017 0 : .map_err(DownloadError::Other);
1018 0 :
1019 0 : self.ext_download_progress
1020 0 : .write()
1021 0 : .expect("bad lock")
1022 0 : .insert(ext_archive_name.to_string(), (download_start, true));
1023 0 :
1024 0 : download_size
1025 0 : }
1026 :
1027 : #[tokio::main]
1028 0 : pub async fn prepare_preload_libraries(
1029 0 : &self,
1030 0 : spec: &ComputeSpec,
1031 0 : ) -> Result<RemoteExtensionMetrics> {
1032 0 : if self.ext_remote_storage.is_none() {
1033 0 : return Ok(RemoteExtensionMetrics {
1034 0 : num_ext_downloaded: 0,
1035 0 : largest_ext_size: 0,
1036 0 : total_ext_download_size: 0,
1037 0 : });
1038 0 : }
1039 0 : let remote_extensions = spec
1040 0 : .remote_extensions
1041 0 : .as_ref()
1042 0 : .ok_or(anyhow::anyhow!("Remote extensions are not configured",))?;
1043 :
1044 0 : info!("parse shared_preload_libraries from spec.cluster.settings");
1045 0 : let mut libs_vec = Vec::new();
1046 0 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
1047 0 : libs_vec = libs
1048 0 : .split(&[',', '\'', ' '])
1049 0 : .filter(|s| *s != "neon" && !s.is_empty())
1050 0 : .map(str::to_string)
1051 0 : .collect();
1052 0 : }
1053 0 : info!("parse shared_preload_libraries from provided postgresql.conf");
1054 :
1055 : // that is used in neon_local and python tests
1056 0 : if let Some(conf) = &spec.cluster.postgresql_conf {
1057 0 : let conf_lines = conf.split('\n').collect::<Vec<&str>>();
1058 0 : let mut shared_preload_libraries_line = "";
1059 0 : for line in conf_lines {
1060 0 : if line.starts_with("shared_preload_libraries") {
1061 0 : shared_preload_libraries_line = line;
1062 0 : }
1063 : }
1064 0 : let mut preload_libs_vec = Vec::new();
1065 0 : if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
1066 0 : preload_libs_vec = libs
1067 0 : .split(&[',', '\'', ' '])
1068 0 : .filter(|s| *s != "neon" && !s.is_empty())
1069 0 : .map(str::to_string)
1070 0 : .collect();
1071 0 : }
1072 0 : libs_vec.extend(preload_libs_vec);
1073 0 : }
1074 :
1075 : // Don't try to download libraries that are not in the index.
1076 : // Assume that they are already present locally.
1077 0 : libs_vec.retain(|lib| remote_extensions.library_index.contains_key(lib));
1078 0 :
1079 0 : info!("Downloading to shared preload libraries: {:?}", &libs_vec);
1080 :
1081 0 : let mut download_tasks = Vec::new();
1082 0 : for library in &libs_vec {
1083 0 : let (ext_name, ext_path) =
1084 0 : remote_extensions.get_ext(library, true, &self.build_tag, &self.pgversion)?;
1085 0 : download_tasks.push(self.download_extension(ext_name, ext_path));
1086 : }
1087 0 : let results = join_all(download_tasks).await;
1088 :
1089 0 : let mut remote_ext_metrics = RemoteExtensionMetrics {
1090 0 : num_ext_downloaded: 0,
1091 0 : largest_ext_size: 0,
1092 0 : total_ext_download_size: 0,
1093 0 : };
1094 0 : for result in results {
1095 0 : let download_size = match result {
1096 0 : Ok(res) => {
1097 0 : remote_ext_metrics.num_ext_downloaded += 1;
1098 0 : res
1099 : }
1100 0 : Err(err) => {
1101 0 : // if we failed to download an extension, we don't want to fail the whole
1102 0 : // process, but we do want to log the error
1103 0 : error!("Failed to download extension: {}", err);
1104 0 : 0
1105 : }
1106 : };
1107 :
1108 0 : remote_ext_metrics.largest_ext_size =
1109 0 : std::cmp::max(remote_ext_metrics.largest_ext_size, download_size);
1110 0 : remote_ext_metrics.total_ext_download_size += download_size;
1111 : }
1112 0 : Ok(remote_ext_metrics)
1113 : }
1114 : }
|