TLA Line data Source code
1 : use std::collections::HashMap;
2 : use std::env;
3 : use std::fs;
4 : use std::io::BufRead;
5 : use std::os::unix::fs::PermissionsExt;
6 : use std::path::Path;
7 : use std::process::{Command, Stdio};
8 : use std::str::FromStr;
9 : use std::sync::{Condvar, Mutex, RwLock};
10 : use std::time::Instant;
11 :
12 : use anyhow::{Context, Result};
13 : use chrono::{DateTime, Utc};
14 : use futures::future::join_all;
15 : use futures::stream::FuturesUnordered;
16 : use futures::StreamExt;
17 : use postgres::{Client, NoTls};
18 : use tokio;
19 : use tokio_postgres;
20 : use tracing::{error, info, instrument, warn};
21 : use utils::id::{TenantId, TimelineId};
22 : use utils::lsn::Lsn;
23 :
24 : use compute_api::responses::{ComputeMetrics, ComputeStatus};
25 : use compute_api::spec::{ComputeMode, ComputeSpec};
26 : use utils::measured_stream::MeasuredReader;
27 :
28 : use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
29 :
30 : use crate::checker::create_availability_check_data;
31 : use crate::pg_helpers::*;
32 : use crate::spec::*;
33 : use crate::sync_sk::{check_if_synced, ping_safekeeper};
34 : use crate::{config, extension_server};
35 :
36 : /// Compute node info shared across several `compute_ctl` threads.
37 : pub struct ComputeNode {
38 : // Url type maintains proper escaping
39 : pub connstr: url::Url,
40 : pub pgdata: String,
41 : pub pgbin: String,
42 : pub pgversion: String,
43 : /// We should only allow live re- / configuration of the compute node if
44 : /// it uses 'pull model', i.e. it can go to control-plane and fetch
45 : /// the latest configuration. Otherwise, there could be a case:
46 : /// - we start compute with some spec provided as argument
47 : /// - we push new spec and it does reconfiguration
48 : /// - but then something happens and compute pod / VM is destroyed,
49 : /// so k8s controller starts it again with the **old** spec
50 : /// and the same for empty computes:
51 : /// - we started compute without any spec
52 : /// - we push spec and it does configuration
53 : /// - but then it is restarted without any spec again
54 : pub live_config_allowed: bool,
55 : /// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
56 : /// To allow HTTP API server to serving status requests, while configuration
57 : /// is in progress, lock should be held only for short periods of time to do
58 : /// read/write, not the whole configuration process.
59 : pub state: Mutex<ComputeState>,
60 : /// `Condvar` to allow notifying waiters about state changes.
61 : pub state_changed: Condvar,
62 : /// the S3 bucket that we search for extensions in
63 : pub ext_remote_storage: Option<GenericRemoteStorage>,
64 : // key: ext_archive_name, value: started download time, download_completed?
65 : pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
66 : pub build_tag: String,
67 : }
68 :
69 : // store some metrics about download size that might impact startup time
70 UBC 0 : #[derive(Clone, Debug)]
71 : pub struct RemoteExtensionMetrics {
72 : num_ext_downloaded: u64,
73 : largest_ext_size: u64,
74 : total_ext_download_size: u64,
75 : }
76 :
77 CBC 1282 : #[derive(Clone, Debug)]
78 : pub struct ComputeState {
79 : pub start_time: DateTime<Utc>,
80 : pub status: ComputeStatus,
81 : /// Timestamp of the last Postgres activity. It could be `None` if
82 : /// compute wasn't used since start.
83 : pub last_active: Option<DateTime<Utc>>,
84 : pub error: Option<String>,
85 : pub pspec: Option<ParsedSpec>,
86 : pub metrics: ComputeMetrics,
87 : }
88 :
89 : impl ComputeState {
90 641 : pub fn new() -> Self {
91 641 : Self {
92 641 : start_time: Utc::now(),
93 641 : status: ComputeStatus::Empty,
94 641 : last_active: None,
95 641 : error: None,
96 641 : pspec: None,
97 641 : metrics: ComputeMetrics::default(),
98 641 : }
99 641 : }
100 : }
101 :
102 : impl Default for ComputeState {
103 UBC 0 : fn default() -> Self {
104 0 : Self::new()
105 0 : }
106 : }
107 :
108 CBC 1834 : #[derive(Clone, Debug)]
109 : pub struct ParsedSpec {
110 : pub spec: ComputeSpec,
111 : pub tenant_id: TenantId,
112 : pub timeline_id: TimelineId,
113 : pub pageserver_connstr: String,
114 : pub safekeeper_connstrings: Vec<String>,
115 : pub storage_auth_token: Option<String>,
116 : }
117 :
118 : impl TryFrom<ComputeSpec> for ParsedSpec {
119 : type Error = String;
120 641 : fn try_from(spec: ComputeSpec) -> Result<Self, String> {
121 : // Extract the options from the spec file that are needed to connect to
122 : // the storage system.
123 : //
124 : // For backwards-compatibility, the top-level fields in the spec file
125 : // may be empty. In that case, we need to dig them from the GUCs in the
126 : // cluster.settings field.
127 641 : let pageserver_connstr = spec
128 641 : .pageserver_connstring
129 641 : .clone()
130 641 : .or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
131 641 : .ok_or("pageserver connstr should be provided")?;
132 641 : let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
133 89 : if matches!(spec.mode, ComputeMode::Primary) {
134 UBC 0 : spec.cluster
135 0 : .settings
136 0 : .find("neon.safekeepers")
137 0 : .ok_or("safekeeper connstrings should be provided")?
138 0 : .split(',')
139 0 : .map(|str| str.to_string())
140 0 : .collect()
141 : } else {
142 CBC 89 : vec![]
143 : }
144 : } else {
145 552 : spec.safekeeper_connstrings.clone()
146 : };
147 641 : let storage_auth_token = spec.storage_auth_token.clone();
148 641 : let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
149 641 : tenant_id
150 : } else {
151 UBC 0 : spec.cluster
152 0 : .settings
153 0 : .find("neon.tenant_id")
154 0 : .ok_or("tenant id should be provided")
155 0 : .map(|s| TenantId::from_str(&s))?
156 0 : .or(Err("invalid tenant id"))?
157 : };
158 CBC 641 : let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
159 641 : timeline_id
160 : } else {
161 UBC 0 : spec.cluster
162 0 : .settings
163 0 : .find("neon.timeline_id")
164 0 : .ok_or("timeline id should be provided")
165 0 : .map(|s| TimelineId::from_str(&s))?
166 0 : .or(Err("invalid timeline id"))?
167 : };
168 :
169 CBC 641 : Ok(ParsedSpec {
170 641 : spec,
171 641 : pageserver_connstr,
172 641 : safekeeper_connstrings,
173 641 : storage_auth_token,
174 641 : tenant_id,
175 641 : timeline_id,
176 641 : })
177 641 : }
178 : }
179 :
180 : /// If we are a VM, returns a [`Command`] that will run in the `neon-postgres`
181 : /// cgroup. Otherwise returns the default `Command::new(cmd)`
182 : ///
183 : /// This function should be used to start postgres, as it will start it in the
184 : /// neon-postgres cgroup if we are a VM. This allows autoscaling to control
185 : /// postgres' resource usage. The cgroup will exist in VMs because vm-builder
186 : /// creates it during the sysinit phase of its inittab.
187 1590 : fn maybe_cgexec(cmd: &str) -> Command {
188 1590 : // The cplane sets this env var for autoscaling computes.
189 1590 : // use `var_os` so we don't have to worry about the variable being valid
190 1590 : // unicode. Should never be an concern . . . but just in case
191 1590 : if env::var_os("AUTOSCALING").is_some() {
192 UBC 0 : let mut command = Command::new("cgexec");
193 0 : command.args(["-g", "memory:neon-postgres"]);
194 0 : command.arg(cmd);
195 0 : command
196 : } else {
197 CBC 1590 : Command::new(cmd)
198 : }
199 1590 : }
200 :
201 : /// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
202 : /// that we give to customers
203 1 : fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
204 1 : let roles = spec
205 1 : .cluster
206 1 : .roles
207 1 : .iter()
208 1 : .map(|r| escape_literal(&r.name))
209 1 : .collect::<Vec<_>>();
210 1 :
211 1 : let dbs = spec
212 1 : .cluster
213 1 : .databases
214 1 : .iter()
215 1 : .map(|db| escape_literal(&db.name))
216 1 : .collect::<Vec<_>>();
217 :
218 1 : let roles_decl = if roles.is_empty() {
219 1 : String::from("roles text[] := NULL;")
220 : } else {
221 UBC 0 : format!(
222 0 : r#"
223 0 : roles text[] := ARRAY(SELECT rolname
224 0 : FROM pg_catalog.pg_roles
225 0 : WHERE rolname IN ({}));"#,
226 0 : roles.join(", ")
227 0 : )
228 : };
229 :
230 CBC 1 : let database_decl = if dbs.is_empty() {
231 1 : String::from("dbs text[] := NULL;")
232 : } else {
233 UBC 0 : format!(
234 0 : r#"
235 0 : dbs text[] := ARRAY(SELECT datname
236 0 : FROM pg_catalog.pg_database
237 0 : WHERE datname IN ({}));"#,
238 0 : dbs.join(", ")
239 0 : )
240 : };
241 :
242 : // ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases
243 : // (see https://www.postgresql.org/docs/current/ddl-priv.html)
244 CBC 1 : let query = format!(
245 1 : r#"
246 1 : DO $$
247 1 : DECLARE
248 1 : r text;
249 1 : {}
250 1 : {}
251 1 : BEGIN
252 1 : IF NOT EXISTS (
253 1 : SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
254 1 : THEN
255 1 : CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION IN ROLE pg_read_all_data, pg_write_all_data;
256 1 : IF array_length(roles, 1) IS NOT NULL THEN
257 1 : EXECUTE format('GRANT neon_superuser TO %s',
258 1 : array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
259 1 : FOREACH r IN ARRAY roles LOOP
260 1 : EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r));
261 1 : END LOOP;
262 1 : END IF;
263 1 : IF array_length(dbs, 1) IS NOT NULL THEN
264 1 : EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser',
265 1 : array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', '));
266 1 : END IF;
267 1 : END IF;
268 1 : END
269 1 : $$;"#,
270 1 : roles_decl, database_decl,
271 1 : );
272 1 : info!("Neon superuser created:\n{}", &query);
273 1 : client
274 1 : .simple_query(&query)
275 1 : .map_err(|e| anyhow::anyhow!(e).context(query))?;
276 1 : Ok(())
277 1 : }
278 :
279 : impl ComputeNode {
280 632 : pub fn set_status(&self, status: ComputeStatus) {
281 632 : let mut state = self.state.lock().unwrap();
282 632 : state.status = status;
283 632 : self.state_changed.notify_all();
284 632 : }
285 :
286 UBC 0 : pub fn get_status(&self) -> ComputeStatus {
287 0 : self.state.lock().unwrap().status
288 0 : }
289 :
290 : // Remove `pgdata` directory and create it again with right permissions.
291 CBC 641 : fn create_pgdata(&self) -> Result<()> {
292 641 : // Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
293 641 : // If it is something different then create_dir() will error out anyway.
294 641 : let _ok = fs::remove_dir_all(&self.pgdata);
295 641 : fs::create_dir(&self.pgdata)?;
296 641 : fs::set_permissions(&self.pgdata, fs::Permissions::from_mode(0o700))?;
297 :
298 641 : Ok(())
299 641 : }
300 :
301 : // Get basebackup from the libpq connection to pageserver using `connstr` and
302 : // unarchive it to `pgdata` directory overriding all its previous content.
303 641 : #[instrument(skip_all, fields(%lsn))]
304 : fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
305 : let spec = compute_state.pspec.as_ref().expect("spec must be set");
306 : let start_time = Instant::now();
307 :
308 : let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
309 :
310 : // Use the storage auth token from the config file, if given.
311 : // Note: this overrides any password set in the connection string.
312 : if let Some(storage_auth_token) = &spec.storage_auth_token {
313 15 : info!("Got storage auth token from spec file");
314 : config.password(storage_auth_token);
315 : } else {
316 626 : info!("Storage auth token not set");
317 : }
318 :
319 : // Connect to pageserver
320 : let mut client = config.connect(NoTls)?;
321 : let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
322 :
323 : let basebackup_cmd = match lsn {
324 : // HACK We don't use compression on first start (Lsn(0)) because there's no API for it
325 : Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id),
326 : _ => format!(
327 : "basebackup {} {} {} --gzip",
328 : spec.tenant_id, spec.timeline_id, lsn
329 : ),
330 : };
331 :
332 : let copyreader = client.copy_out(basebackup_cmd.as_str())?;
333 : let mut measured_reader = MeasuredReader::new(copyreader);
334 :
335 : // Check the magic number to see if it's a gzip or not. Even though
336 : // we might explicitly ask for gzip, an old pageserver with no implementation
337 : // of gzip compression might send us uncompressed data. After some time
338 : // passes we can assume all pageservers know how to compress and we can
339 : // delete this check.
340 : //
341 : // If the data is not gzip, it will be tar. It will not be mistakenly
342 : // recognized as gzip because tar starts with an ascii encoding of a filename,
343 : // and 0x1f and 0x8b are unlikely first characters for any filename. Moreover,
344 : // we send the "global" directory first from the pageserver, so it definitely
345 : // won't be recognized as gzip.
346 : let mut bufreader = std::io::BufReader::new(&mut measured_reader);
347 : let gzip = {
348 : let peek = bufreader.fill_buf().unwrap();
349 : peek[0] == 0x1f && peek[1] == 0x8b
350 : };
351 :
352 : // Read the archive directly from the `CopyOutReader`
353 : //
354 : // Set `ignore_zeros` so that unpack() reads all the Copy data and
355 : // doesn't stop at the end-of-archive marker. Otherwise, if the server
356 : // sends an Error after finishing the tarball, we will not notice it.
357 : if gzip {
358 : let mut ar = tar::Archive::new(flate2::read::GzDecoder::new(&mut bufreader));
359 : ar.set_ignore_zeros(true);
360 : ar.unpack(&self.pgdata)?;
361 : } else {
362 : let mut ar = tar::Archive::new(&mut bufreader);
363 : ar.set_ignore_zeros(true);
364 : ar.unpack(&self.pgdata)?;
365 : };
366 :
367 : // Report metrics
368 : let mut state = self.state.lock().unwrap();
369 : state.metrics.pageserver_connect_micros = pageserver_connect_micros;
370 : state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
371 : state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
372 : Ok(())
373 : }
374 :
375 552 : pub async fn check_safekeepers_synced_async(
376 552 : &self,
377 552 : compute_state: &ComputeState,
378 552 : ) -> Result<Option<Lsn>> {
379 552 : // Construct a connection config for each safekeeper
380 552 : let pspec: ParsedSpec = compute_state
381 552 : .pspec
382 552 : .as_ref()
383 552 : .expect("spec must be set")
384 552 : .clone();
385 552 : let sk_connstrs: Vec<String> = pspec.safekeeper_connstrings.clone();
386 749 : let sk_configs = sk_connstrs.into_iter().map(|connstr| {
387 749 : // Format connstr
388 749 : let id = connstr.clone();
389 749 : let connstr = format!("postgresql://no_user@{}", connstr);
390 749 : let options = format!(
391 749 : "-c timeline_id={} tenant_id={}",
392 749 : pspec.timeline_id, pspec.tenant_id
393 749 : );
394 749 :
395 749 : // Construct client
396 749 : let mut config = tokio_postgres::Config::from_str(&connstr).unwrap();
397 749 : config.options(&options);
398 749 : if let Some(storage_auth_token) = pspec.storage_auth_token.clone() {
399 26 : config.password(storage_auth_token);
400 723 : }
401 :
402 749 : (id, config)
403 749 : });
404 552 :
405 552 : // Create task set to query all safekeepers
406 552 : let mut tasks = FuturesUnordered::new();
407 552 : let quorum = sk_configs.len() / 2 + 1;
408 1301 : for (id, config) in sk_configs {
409 749 : let timeout = tokio::time::Duration::from_millis(100);
410 749 : let task = tokio::time::timeout(timeout, ping_safekeeper(id, config));
411 749 : tasks.push(tokio::spawn(task));
412 749 : }
413 :
414 : // Get a quorum of responses or errors
415 552 : let mut responses = Vec::new();
416 552 : let mut join_errors = Vec::new();
417 552 : let mut task_errors = Vec::new();
418 552 : let mut timeout_errors = Vec::new();
419 1174 : while let Some(response) = tasks.next().await {
420 656 : match response {
421 652 : Ok(Ok(Ok(r))) => responses.push(r),
422 4 : Ok(Ok(Err(e))) => task_errors.push(e),
423 UBC 0 : Ok(Err(e)) => timeout_errors.push(e),
424 0 : Err(e) => join_errors.push(e),
425 : };
426 CBC 656 : if responses.len() >= quorum {
427 552 : break;
428 104 : }
429 104 : if join_errors.len() + task_errors.len() + timeout_errors.len() >= quorum {
430 UBC 0 : break;
431 CBC 104 : }
432 : }
433 :
434 : // In case of error, log and fail the check, but don't crash.
435 : // We're playing it safe because these errors could be transient
436 : // and we don't yet retry. Also being careful here allows us to
437 : // be backwards compatible with safekeepers that don't have the
438 : // TIMELINE_STATUS API yet.
439 552 : if responses.len() < quorum {
440 UBC 0 : error!(
441 0 : "failed sync safekeepers check {:?} {:?} {:?}",
442 0 : join_errors, task_errors, timeout_errors
443 0 : );
444 0 : return Ok(None);
445 CBC 552 : }
446 552 :
447 552 : Ok(check_if_synced(responses))
448 552 : }
449 :
450 : // Fast path for sync_safekeepers. If they're already synced we get the lsn
451 : // in one roundtrip. If not, we should do a full sync_safekeepers.
452 552 : pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
453 552 : let start_time = Utc::now();
454 552 :
455 552 : // Run actual work with new tokio runtime
456 552 : let rt = tokio::runtime::Builder::new_current_thread()
457 552 : .enable_all()
458 552 : .build()
459 552 : .expect("failed to create rt");
460 552 : let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
461 552 :
462 552 : // Record runtime
463 552 : self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
464 552 : .signed_duration_since(start_time)
465 552 : .to_std()
466 552 : .unwrap()
467 552 : .as_millis() as u64;
468 552 : result
469 552 : }
470 :
471 : // Run `postgres` in a special mode with `--sync-safekeepers` argument
472 : // and return the reported LSN back to the caller.
473 958 : #[instrument(skip_all)]
474 : pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
475 : let start_time = Utc::now();
476 :
477 : let sync_handle = maybe_cgexec(&self.pgbin)
478 : .args(["--sync-safekeepers"])
479 : .env("PGDATA", &self.pgdata) // we cannot use -D in this mode
480 : .envs(if let Some(storage_auth_token) = &storage_auth_token {
481 : vec![("NEON_AUTH_TOKEN", storage_auth_token)]
482 : } else {
483 : vec![]
484 : })
485 : .stdout(Stdio::piped())
486 : .spawn()
487 : .expect("postgres --sync-safekeepers failed to start");
488 :
489 : // `postgres --sync-safekeepers` will print all log output to stderr and
490 : // final LSN to stdout. So we pipe only stdout, while stderr will be automatically
491 : // redirected to the caller output.
492 : let sync_output = sync_handle
493 : .wait_with_output()
494 : .expect("postgres --sync-safekeepers failed");
495 :
496 : if !sync_output.status.success() {
497 : anyhow::bail!(
498 : "postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
499 : sync_output.status,
500 : String::from_utf8(sync_output.stdout)
501 : .expect("postgres --sync-safekeepers exited, and stdout is not utf-8"),
502 : );
503 : }
504 :
505 : self.state.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now()
506 : .signed_duration_since(start_time)
507 : .to_std()
508 : .unwrap()
509 : .as_millis() as u64;
510 :
511 : let lsn = Lsn::from_str(String::from_utf8(sync_output.stdout)?.trim())?;
512 :
513 : Ok(lsn)
514 : }
515 :
516 : /// Do all the preparations like PGDATA directory creation, configuration,
517 : /// safekeepers sync, basebackup, etc.
518 641 : #[instrument(skip_all)]
519 : pub fn prepare_pgdata(
520 : &self,
521 : compute_state: &ComputeState,
522 : extension_server_port: u16,
523 : ) -> Result<()> {
524 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
525 : let spec = &pspec.spec;
526 : let pgdata_path = Path::new(&self.pgdata);
527 :
528 : // Remove/create an empty pgdata directory and put configuration there.
529 : self.create_pgdata()?;
530 : config::write_postgres_conf(
531 : &pgdata_path.join("postgresql.conf"),
532 : &pspec.spec,
533 : Some(extension_server_port),
534 : )?;
535 :
536 : // Syncing safekeepers is only safe with primary nodes: if a primary
537 : // is already connected it will be kicked out, so a secondary (standby)
538 : // cannot sync safekeepers.
539 : let lsn = match spec.mode {
540 : ComputeMode::Primary => {
541 552 : info!("checking if safekeepers are synced");
542 : let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
543 : lsn
544 : } else {
545 406 : info!("starting safekeepers syncing");
546 : self.sync_safekeepers(pspec.storage_auth_token.clone())
547 UBC 0 : .with_context(|| "failed to sync safekeepers")?
548 : };
549 CBC 552 : info!("safekeepers synced at LSN {}", lsn);
550 : lsn
551 : }
552 : ComputeMode::Static(lsn) => {
553 88 : info!("Starting read-only node at static LSN {}", lsn);
554 : lsn
555 : }
556 : ComputeMode::Replica => {
557 1 : info!("Initializing standby from latest Pageserver LSN");
558 : Lsn(0)
559 : }
560 : };
561 :
562 641 : info!(
563 641 : "getting basebackup@{} from pageserver {}",
564 641 : lsn, &pspec.pageserver_connstr
565 641 : );
566 9 : self.get_basebackup(compute_state, lsn).with_context(|| {
567 9 : format!(
568 9 : "failed to get basebackup@{} from pageserver {}",
569 9 : lsn, &pspec.pageserver_connstr
570 9 : )
571 9 : })?;
572 :
573 : // Update pg_hba.conf received with basebackup.
574 : update_pg_hba(pgdata_path)?;
575 :
576 : match spec.mode {
577 : ComputeMode::Primary => {}
578 : ComputeMode::Replica | ComputeMode::Static(..) => {
579 : add_standby_signal(pgdata_path)?;
580 : }
581 : }
582 :
583 : Ok(())
584 : }
585 :
586 : /// Start and stop a postgres process to warm up the VM for startup.
587 UBC 0 : pub fn prewarm_postgres(&self) -> Result<()> {
588 0 : info!("prewarming");
589 :
590 : // Create pgdata
591 0 : let pgdata = &format!("{}.warmup", self.pgdata);
592 0 : create_pgdata(pgdata)?;
593 :
594 : // Run initdb to completion
595 0 : info!("running initdb");
596 0 : let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
597 0 : Command::new(initdb_bin)
598 0 : .args(["-D", pgdata])
599 0 : .output()
600 0 : .expect("cannot start initdb process");
601 0 :
602 0 : // Write conf
603 0 : use std::io::Write;
604 0 : let conf_path = Path::new(pgdata).join("postgresql.conf");
605 0 : let mut file = std::fs::File::create(conf_path)?;
606 0 : writeln!(file, "shared_buffers=65536")?;
607 0 : writeln!(file, "port=51055")?; // Nobody should be connecting
608 0 : writeln!(file, "shared_preload_libraries = 'neon'")?;
609 :
610 : // Start postgres
611 0 : info!("starting postgres");
612 0 : let mut pg = maybe_cgexec(&self.pgbin)
613 0 : .args(["-D", pgdata])
614 0 : .spawn()
615 0 : .expect("cannot start postgres process");
616 0 :
617 0 : // Stop it when it's ready
618 0 : info!("waiting for postgres");
619 0 : wait_for_postgres(&mut pg, Path::new(pgdata))?;
620 0 : pg.kill()?;
621 0 : info!("sent kill signal");
622 0 : pg.wait()?;
623 0 : info!("done prewarming");
624 :
625 : // clean up
626 0 : let _ok = fs::remove_dir_all(pgdata);
627 0 : Ok(())
628 0 : }
629 :
630 : /// Start Postgres as a child process and manage DBs/roles.
631 : /// After that this will hang waiting on the postmaster process to exit.
632 CBC 632 : #[instrument(skip_all)]
633 : pub fn start_postgres(
634 : &self,
635 : storage_auth_token: Option<String>,
636 : ) -> Result<std::process::Child> {
637 : let pgdata_path = Path::new(&self.pgdata);
638 :
639 : // Run postgres as a child process.
640 : let mut pg = maybe_cgexec(&self.pgbin)
641 : .args(["-D", &self.pgdata])
642 : .envs(if let Some(storage_auth_token) = &storage_auth_token {
643 : vec![("NEON_AUTH_TOKEN", storage_auth_token)]
644 : } else {
645 : vec![]
646 : })
647 : .spawn()
648 : .expect("cannot start postgres process");
649 :
650 : wait_for_postgres(&mut pg, pgdata_path)?;
651 :
652 : Ok(pg)
653 : }
654 :
655 : /// Do initial configuration of the already started Postgres.
656 1 : #[instrument(skip_all)]
657 : pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
658 : // If connection fails,
659 : // it may be the old node with `zenith_admin` superuser.
660 : //
661 : // In this case we need to connect with old `zenith_admin` name
662 : // and create new user. We cannot simply rename connected user,
663 : // but we can create a new one and grant it all privileges.
664 : let mut client = match Client::connect(self.connstr.as_str(), NoTls) {
665 : Err(e) => {
666 UBC 0 : info!(
667 0 : "cannot connect to postgres: {}, retrying with `zenith_admin` username",
668 0 : e
669 0 : );
670 : let mut zenith_admin_connstr = self.connstr.clone();
671 :
672 : zenith_admin_connstr
673 : .set_username("zenith_admin")
674 0 : .map_err(|_| anyhow::anyhow!("invalid connstr"))?;
675 :
676 : let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
677 : // Disable forwarding so that users don't get a cloud_admin role
678 : client.simple_query("SET neon.forward_ddl = false")?;
679 : client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
680 : client.simple_query("GRANT zenith_admin TO cloud_admin")?;
681 : drop(client);
682 :
683 : // reconnect with connsting with expected name
684 : Client::connect(self.connstr.as_str(), NoTls)?
685 : }
686 : Ok(client) => client,
687 : };
688 :
689 : // Disable DDL forwarding because control plane already knows about these roles/databases.
690 : client.simple_query("SET neon.forward_ddl = false")?;
691 :
692 : // Proceed with post-startup configuration. Note, that order of operations is important.
693 : let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
694 : create_neon_superuser(spec, &mut client)?;
695 : cleanup_instance(&mut client)?;
696 : handle_roles(spec, &mut client)?;
697 : handle_databases(spec, &mut client)?;
698 : handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
699 : handle_grants(spec, &mut client, self.connstr.as_str())?;
700 : handle_extensions(spec, &mut client)?;
701 : create_availability_check_data(&mut client)?;
702 :
703 : // 'Close' connection
704 : drop(client);
705 :
706 : Ok(())
707 : }
708 :
709 : // We could've wrapped this around `pg_ctl reload`, but right now we don't use
710 : // `pg_ctl` for start / stop, so this just seems much easier to do as we already
711 : // have opened connection to Postgres and superuser access.
712 0 : #[instrument(skip_all)]
713 : fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
714 : client.simple_query("SELECT pg_reload_conf()")?;
715 : Ok(())
716 : }
717 :
718 : /// Similar to `apply_config()`, but does a bit different sequence of operations,
719 : /// as it's used to reconfigure a previously started and configured Postgres node.
720 0 : #[instrument(skip_all)]
721 : pub fn reconfigure(&self) -> Result<()> {
722 : let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
723 :
724 : // Write new config
725 : let pgdata_path = Path::new(&self.pgdata);
726 : config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
727 :
728 : let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
729 : self.pg_reload_conf(&mut client)?;
730 :
731 : // Proceed with post-startup configuration. Note, that order of operations is important.
732 : // Disable DDL forwarding because control plane already knows about these roles/databases.
733 : if spec.mode == ComputeMode::Primary {
734 : client.simple_query("SET neon.forward_ddl = false")?;
735 : cleanup_instance(&mut client)?;
736 : handle_roles(&spec, &mut client)?;
737 : handle_databases(&spec, &mut client)?;
738 : handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
739 : handle_grants(&spec, &mut client, self.connstr.as_str())?;
740 : handle_extensions(&spec, &mut client)?;
741 : }
742 :
743 : // 'Close' connection
744 : drop(client);
745 :
746 : let unknown_op = "unknown".to_string();
747 : let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
748 0 : info!(
749 0 : "finished reconfiguration of compute node for operation {}",
750 0 : op_id
751 0 : );
752 :
753 : Ok(())
754 : }
755 :
756 CBC 641 : #[instrument(skip_all)]
757 : pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
758 : let compute_state = self.state.lock().unwrap().clone();
759 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
760 641 : info!(
761 641 : "starting compute for project {}, operation {}, tenant {}, timeline {}",
762 641 : pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
763 641 : pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
764 641 : pspec.tenant_id,
765 641 : pspec.timeline_id,
766 641 : );
767 :
768 641 : info!(
769 641 : "start_compute spec.remote_extensions {:?}",
770 641 : pspec.spec.remote_extensions
771 641 : );
772 :
773 : // This part is sync, because we need to download
774 : // remote shared_preload_libraries before postgres start (if any)
775 : if let Some(remote_extensions) = &pspec.spec.remote_extensions {
776 : // First, create control files for all availale extensions
777 : extension_server::create_control_files(remote_extensions, &self.pgbin);
778 :
779 : let library_load_start_time = Utc::now();
780 : let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
781 :
782 : let library_load_time = Utc::now()
783 : .signed_duration_since(library_load_start_time)
784 : .to_std()
785 : .unwrap()
786 : .as_millis() as u64;
787 : let mut state = self.state.lock().unwrap();
788 : state.metrics.load_ext_ms = library_load_time;
789 : state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded;
790 : state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size;
791 : state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size;
792 UBC 0 : info!(
793 0 : "Loading shared_preload_libraries took {:?}ms",
794 0 : library_load_time
795 0 : );
796 0 : info!("{:?}", remote_ext_metrics);
797 : }
798 :
799 : self.prepare_pgdata(&compute_state, extension_server_port)?;
800 :
801 : let start_time = Utc::now();
802 : let pg = self.start_postgres(pspec.storage_auth_token.clone())?;
803 :
804 : let config_time = Utc::now();
805 : if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
806 : self.apply_config(&compute_state)?;
807 : }
808 :
809 : let startup_end_time = Utc::now();
810 : {
811 : let mut state = self.state.lock().unwrap();
812 : state.metrics.start_postgres_ms = config_time
813 : .signed_duration_since(start_time)
814 : .to_std()
815 : .unwrap()
816 : .as_millis() as u64;
817 : state.metrics.config_ms = startup_end_time
818 : .signed_duration_since(config_time)
819 : .to_std()
820 : .unwrap()
821 : .as_millis() as u64;
822 : state.metrics.total_startup_ms = startup_end_time
823 : .signed_duration_since(compute_state.start_time)
824 : .to_std()
825 : .unwrap()
826 : .as_millis() as u64;
827 : }
828 : self.set_status(ComputeStatus::Running);
829 :
830 CBC 632 : info!(
831 632 : "finished configuration of compute for project {}",
832 632 : pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
833 632 : );
834 :
835 : // Log metrics so that we can search for slow operations in logs
836 : let metrics = {
837 : let state = self.state.lock().unwrap();
838 : state.metrics.clone()
839 : };
840 632 : info!(?metrics, "compute start finished");
841 :
842 : Ok(pg)
843 : }
844 :
845 : // Look for core dumps and collect backtraces.
846 : //
847 : // EKS worker nodes have following core dump settings:
848 : // /proc/sys/kernel/core_pattern -> core
849 : // /proc/sys/kernel/core_uses_pid -> 1
850 : // ulimint -c -> unlimited
851 : // which results in core dumps being written to postgres data directory as core.<pid>.
852 : //
853 : // Use that as a default location and pattern, except macos where core dumps are written
854 : // to /cores/ directory by default.
855 641 : pub fn check_for_core_dumps(&self) -> Result<()> {
856 641 : let core_dump_dir = match std::env::consts::OS {
857 641 : "macos" => Path::new("/cores/"),
858 641 : _ => Path::new(&self.pgdata),
859 : };
860 :
861 : // Collect core dump paths if any
862 641 : info!("checking for core dumps in {}", core_dump_dir.display());
863 641 : let files = fs::read_dir(core_dump_dir)?;
864 15298 : let cores = files.filter_map(|entry| {
865 15298 : let entry = entry.ok()?;
866 15298 : let _ = entry.file_name().to_str()?.strip_prefix("core.")?;
867 UBC 0 : Some(entry.path())
868 CBC 15298 : });
869 :
870 : // Print backtrace for each core dump
871 641 : for core_path in cores {
872 UBC 0 : warn!(
873 0 : "core dump found: {}, collecting backtrace",
874 0 : core_path.display()
875 0 : );
876 :
877 : // Try first with gdb
878 0 : let backtrace = Command::new("gdb")
879 0 : .args(["--batch", "-q", "-ex", "bt", &self.pgbin])
880 0 : .arg(&core_path)
881 0 : .output();
882 :
883 : // Try lldb if no gdb is found -- that is handy for local testing on macOS
884 0 : let backtrace = match backtrace {
885 0 : Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {
886 0 : warn!("cannot find gdb, trying lldb");
887 0 : Command::new("lldb")
888 0 : .arg("-c")
889 0 : .arg(&core_path)
890 0 : .args(["--batch", "-o", "bt all", "-o", "quit"])
891 0 : .output()
892 : }
893 0 : _ => backtrace,
894 0 : }?;
895 :
896 0 : warn!(
897 0 : "core dump backtrace: {}",
898 0 : String::from_utf8_lossy(&backtrace.stdout)
899 0 : );
900 0 : warn!(
901 0 : "debugger stderr: {}",
902 0 : String::from_utf8_lossy(&backtrace.stderr)
903 0 : );
904 : }
905 :
906 CBC 641 : Ok(())
907 641 : }
908 :
909 : /// Select `pg_stat_statements` data and return it as a stringified JSON
910 UBC 0 : pub async fn collect_insights(&self) -> String {
911 0 : let mut result_rows: Vec<String> = Vec::new();
912 0 : let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await;
913 0 : let (client, connection) = connect_result.unwrap();
914 0 : tokio::spawn(async move {
915 0 : if let Err(e) = connection.await {
916 0 : eprintln!("connection error: {}", e);
917 0 : }
918 0 : });
919 0 : let result = client
920 0 : .simple_query(
921 0 : "SELECT
922 0 : row_to_json(pg_stat_statements)
923 0 : FROM
924 0 : pg_stat_statements
925 0 : WHERE
926 0 : userid != 'cloud_admin'::regrole::oid
927 0 : ORDER BY
928 0 : (mean_exec_time + mean_plan_time) DESC
929 0 : LIMIT 100",
930 0 : )
931 0 : .await;
932 :
933 0 : if let Ok(raw_rows) = result {
934 0 : for message in raw_rows.iter() {
935 0 : if let postgres::SimpleQueryMessage::Row(row) = message {
936 0 : if let Some(json) = row.get(0) {
937 0 : result_rows.push(json.to_string());
938 0 : }
939 0 : }
940 : }
941 :
942 0 : format!("{{\"pg_stat_statements\": [{}]}}", result_rows.join(","))
943 : } else {
944 0 : "{{\"pg_stat_statements\": []}}".to_string()
945 : }
946 0 : }
947 :
948 : // download an archive, unzip and place files in correct locations
949 0 : pub async fn download_extension(
950 0 : &self,
951 0 : real_ext_name: String,
952 0 : ext_path: RemotePath,
953 0 : ) -> Result<u64, DownloadError> {
954 0 : let remote_storage = self
955 0 : .ext_remote_storage
956 0 : .as_ref()
957 0 : .ok_or(DownloadError::BadInput(anyhow::anyhow!(
958 0 : "Remote extensions storage is not configured",
959 0 : )))?;
960 :
961 0 : let ext_archive_name = ext_path.object_name().expect("bad path");
962 0 :
963 0 : let mut first_try = false;
964 0 : if !self
965 0 : .ext_download_progress
966 0 : .read()
967 0 : .expect("lock err")
968 0 : .contains_key(ext_archive_name)
969 0 : {
970 0 : self.ext_download_progress
971 0 : .write()
972 0 : .expect("lock err")
973 0 : .insert(ext_archive_name.to_string(), (Utc::now(), false));
974 0 : first_try = true;
975 0 : }
976 0 : let (download_start, download_completed) =
977 0 : self.ext_download_progress.read().expect("lock err")[ext_archive_name];
978 0 : let start_time_delta = Utc::now()
979 0 : .signed_duration_since(download_start)
980 0 : .to_std()
981 0 : .unwrap()
982 0 : .as_millis() as u64;
983 0 :
984 0 : // how long to wait for extension download if it was started by another process
985 0 : const HANG_TIMEOUT: u64 = 3000; // milliseconds
986 0 :
987 0 : if download_completed {
988 0 : info!("extension already downloaded, skipping re-download");
989 0 : return Ok(0);
990 0 : } else if start_time_delta < HANG_TIMEOUT && !first_try {
991 0 : info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout");
992 0 : let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
993 : loop {
994 0 : info!("waiting for download");
995 0 : interval.tick().await;
996 0 : let (_, download_completed_now) =
997 0 : self.ext_download_progress.read().expect("lock")[ext_archive_name];
998 0 : if download_completed_now {
999 0 : info!("download finished by whoever else downloaded it");
1000 0 : return Ok(0);
1001 0 : }
1002 : }
1003 : // NOTE: the above loop will get terminated
1004 : // based on the timeout of the download function
1005 0 : }
1006 :
1007 : // if extension hasn't been downloaded before or the previous
1008 : // attempt to download was at least HANG_TIMEOUT ms ago
1009 : // then we try to download it here
1010 0 : info!("downloading new extension {ext_archive_name}");
1011 :
1012 0 : let download_size = extension_server::download_extension(
1013 0 : &real_ext_name,
1014 0 : &ext_path,
1015 0 : remote_storage,
1016 0 : &self.pgbin,
1017 0 : )
1018 0 : .await
1019 0 : .map_err(DownloadError::Other);
1020 0 :
1021 0 : self.ext_download_progress
1022 0 : .write()
1023 0 : .expect("bad lock")
1024 0 : .insert(ext_archive_name.to_string(), (download_start, true));
1025 0 :
1026 0 : download_size
1027 0 : }
1028 :
1029 : #[tokio::main]
1030 0 : pub async fn prepare_preload_libraries(
1031 0 : &self,
1032 0 : spec: &ComputeSpec,
1033 0 : ) -> Result<RemoteExtensionMetrics> {
1034 0 : if self.ext_remote_storage.is_none() {
1035 0 : return Ok(RemoteExtensionMetrics {
1036 0 : num_ext_downloaded: 0,
1037 0 : largest_ext_size: 0,
1038 0 : total_ext_download_size: 0,
1039 0 : });
1040 0 : }
1041 0 : let remote_extensions = spec
1042 0 : .remote_extensions
1043 0 : .as_ref()
1044 0 : .ok_or(anyhow::anyhow!("Remote extensions are not configured"))?;
1045 :
1046 0 : info!("parse shared_preload_libraries from spec.cluster.settings");
1047 0 : let mut libs_vec = Vec::new();
1048 0 : if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
1049 0 : libs_vec = libs
1050 0 : .split(&[',', '\'', ' '])
1051 0 : .filter(|s| *s != "neon" && !s.is_empty())
1052 0 : .map(str::to_string)
1053 0 : .collect();
1054 0 : }
1055 0 : info!("parse shared_preload_libraries from provided postgresql.conf");
1056 :
1057 : // that is used in neon_local and python tests
1058 0 : if let Some(conf) = &spec.cluster.postgresql_conf {
1059 0 : let conf_lines = conf.split('\n').collect::<Vec<&str>>();
1060 0 : let mut shared_preload_libraries_line = "";
1061 0 : for line in conf_lines {
1062 0 : if line.starts_with("shared_preload_libraries") {
1063 0 : shared_preload_libraries_line = line;
1064 0 : }
1065 : }
1066 0 : let mut preload_libs_vec = Vec::new();
1067 0 : if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
1068 0 : preload_libs_vec = libs
1069 0 : .split(&[',', '\'', ' '])
1070 0 : .filter(|s| *s != "neon" && !s.is_empty())
1071 0 : .map(str::to_string)
1072 0 : .collect();
1073 0 : }
1074 0 : libs_vec.extend(preload_libs_vec);
1075 0 : }
1076 :
1077 : // Don't try to download libraries that are not in the index.
1078 : // Assume that they are already present locally.
1079 0 : libs_vec.retain(|lib| remote_extensions.library_index.contains_key(lib));
1080 0 :
1081 0 : info!("Downloading to shared preload libraries: {:?}", &libs_vec);
1082 :
1083 0 : let mut download_tasks = Vec::new();
1084 0 : for library in &libs_vec {
1085 0 : let (ext_name, ext_path) =
1086 0 : remote_extensions.get_ext(library, true, &self.build_tag, &self.pgversion)?;
1087 0 : download_tasks.push(self.download_extension(ext_name, ext_path));
1088 : }
1089 0 : let results = join_all(download_tasks).await;
1090 :
1091 0 : let mut remote_ext_metrics = RemoteExtensionMetrics {
1092 0 : num_ext_downloaded: 0,
1093 0 : largest_ext_size: 0,
1094 0 : total_ext_download_size: 0,
1095 0 : };
1096 0 : for result in results {
1097 0 : let download_size = match result {
1098 0 : Ok(res) => {
1099 0 : remote_ext_metrics.num_ext_downloaded += 1;
1100 0 : res
1101 : }
1102 0 : Err(err) => {
1103 0 : // if we failed to download an extension, we don't want to fail the whole
1104 0 : // process, but we do want to log the error
1105 0 : error!("Failed to download extension: {}", err);
1106 0 : 0
1107 : }
1108 : };
1109 :
1110 0 : remote_ext_metrics.largest_ext_size =
1111 0 : std::cmp::max(remote_ext_metrics.largest_ext_size, download_size);
1112 0 : remote_ext_metrics.total_ext_download_size += download_size;
1113 : }
1114 0 : Ok(remote_ext_metrics)
1115 : }
1116 : }
|