LCOV - code coverage report
Current view: top level - control_plane/src - background_process.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 0.0 % 267 0
Test Date: 2024-11-13 18:23:39 Functions: 0.0 % 67 0

            Line data    Source code
       1              : //! Spawns and kills background processes that are needed by Neon CLI.
       2              : //! Applies common set-up such as log and pid files (if needed) to every process.
       3              : //!
       4              : //! Neon CLI does not run in background, so it needs to store the information about
       5              : //! spawned processes, which it does in this module.
       6              : //! We do that by storing the pid of the process in the "${process_name}.pid" file.
       7              : //! The pid file can be created by the process itself
       8              : //! (Neon storage binaries do that and also ensure that a lock is taken onto that file)
       9              : //! or we create such file after starting the process
      10              : //! (non-Neon binaries don't necessarily follow our pidfile conventions).
      11              : //! The pid stored in the file is later used to stop the service.
      12              : //!
      13              : //! See the [`lock_file`](utils::lock_file) module for more info.
      14              : 
      15              : use std::ffi::OsStr;
      16              : use std::io::Write;
      17              : use std::os::unix::prelude::AsRawFd;
      18              : use std::os::unix::process::CommandExt;
      19              : use std::path::Path;
      20              : use std::process::Command;
      21              : use std::time::Duration;
      22              : use std::{fs, io, thread};
      23              : 
      24              : use anyhow::Context;
      25              : use camino::{Utf8Path, Utf8PathBuf};
      26              : use nix::errno::Errno;
      27              : use nix::fcntl::{FcntlArg, FdFlag};
      28              : use nix::sys::signal::{kill, Signal};
      29              : use nix::unistd::Pid;
      30              : use utils::pid_file::{self, PidFileRead};
      31              : 
      32              : // These constants control the loop used to poll for process start / stop.
      33              : //
      34              : // The loop waits for at most 10 seconds, polling every 100 ms.
      35              : // Once a second, it prints a dot ("."), to give the user an indication that
      36              : // it's waiting. If the process hasn't started/stopped after 5 seconds,
      37              : // it prints a notice that it's taking long, but keeps waiting.
      38              : //
      39              : const STOP_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
      40              : const STOP_RETRIES: u128 = STOP_RETRY_TIMEOUT.as_millis() / RETRY_INTERVAL.as_millis();
      41              : const RETRY_INTERVAL: Duration = Duration::from_millis(100);
      42              : const DOT_EVERY_RETRIES: u128 = 10;
      43              : const NOTICE_AFTER_RETRIES: u128 = 50;
      44              : 
      45              : /// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates
      46              : /// it itself.
      47              : pub enum InitialPidFile {
      48              :     /// Create a pidfile, to allow future CLI invocations to manipulate the process.
      49              :     Create(Utf8PathBuf),
      50              :     /// The process will create the pidfile itself, need to wait for that event.
      51              :     Expect(Utf8PathBuf),
      52              : }
      53              : 
      54              : /// Start a background child process using the parameters given.
      55              : #[allow(clippy::too_many_arguments)]
      56            0 : pub async fn start_process<F, Fut, AI, A, EI>(
      57            0 :     process_name: &str,
      58            0 :     datadir: &Path,
      59            0 :     command: &Path,
      60            0 :     args: AI,
      61            0 :     envs: EI,
      62            0 :     initial_pid_file: InitialPidFile,
      63            0 :     retry_timeout: &Duration,
      64            0 :     process_status_check: F,
      65            0 : ) -> anyhow::Result<()>
      66            0 : where
      67            0 :     F: Fn() -> Fut,
      68            0 :     Fut: std::future::Future<Output = anyhow::Result<bool>>,
      69            0 :     AI: IntoIterator<Item = A>,
      70            0 :     A: AsRef<OsStr>,
      71            0 :     // Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
      72            0 :     EI: IntoIterator<Item = (String, String)>,
      73            0 : {
      74            0 :     let retries: u128 = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
      75            0 :     if !datadir.metadata().context("stat datadir")?.is_dir() {
      76            0 :         anyhow::bail!("`datadir` must be a directory when calling this function: {datadir:?}");
      77            0 :     }
      78            0 :     let log_path = datadir.join(format!("{process_name}.log"));
      79            0 :     let process_log_file = fs::OpenOptions::new()
      80            0 :         .create(true)
      81            0 :         .append(true)
      82            0 :         .open(&log_path)
      83            0 :         .with_context(|| {
      84            0 :             format!("Could not open {process_name} log file {log_path:?} for writing")
      85            0 :         })?;
      86            0 :     let same_file_for_stderr = process_log_file.try_clone().with_context(|| {
      87            0 :         format!("Could not reuse {process_name} log file {log_path:?} for writing stderr")
      88            0 :     })?;
      89              : 
      90            0 :     let mut command = Command::new(command);
      91            0 :     let background_command = command
      92            0 :         .stdout(process_log_file)
      93            0 :         .stderr(same_file_for_stderr)
      94            0 :         .args(args)
      95            0 :         // spawn all child processes in their datadir, useful for all kinds of things,
      96            0 :         // not least cleaning up child processes e.g. after an unclean exit from the test suite:
      97            0 :         // ```
      98            0 :         // lsof  -d cwd -a +D  Users/cs/src/neon/test_output
      99            0 :         // ```
     100            0 :         .current_dir(datadir);
     101            0 : 
     102            0 :     let filled_cmd = fill_env_vars_prefixed_neon(fill_remote_storage_secrets_vars(
     103            0 :         fill_rust_env_vars(background_command),
     104            0 :     ));
     105            0 :     filled_cmd.envs(envs);
     106              : 
     107            0 :     let pid_file_to_check = match &initial_pid_file {
     108            0 :         InitialPidFile::Create(path) => {
     109            0 :             pre_exec_create_pidfile(filled_cmd, path);
     110            0 :             path
     111              :         }
     112            0 :         InitialPidFile::Expect(path) => path,
     113              :     };
     114              : 
     115            0 :     let spawned_process = filled_cmd.spawn().with_context(|| {
     116            0 :         format!("Could not spawn {process_name}, see console output and log files for details.")
     117            0 :     })?;
     118            0 :     let pid = spawned_process.id();
     119            0 :     let pid = Pid::from_raw(
     120            0 :         i32::try_from(pid)
     121            0 :             .with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?,
     122              :     );
     123              :     // set up a scopeguard to kill & wait for the child in case we panic or bail below
     124            0 :     let spawned_process = scopeguard::guard(spawned_process, |mut spawned_process| {
     125            0 :         println!("SIGKILL & wait the started process");
     126            0 :         (|| {
     127            0 :             // TODO: use another signal that can be caught by the child so it can clean up any children it spawned (e..g, walredo).
     128            0 :             spawned_process.kill().context("SIGKILL child")?;
     129            0 :             spawned_process.wait().context("wait() for child process")?;
     130            0 :             anyhow::Ok(())
     131            0 :         })()
     132            0 :         .with_context(|| format!("scopeguard kill&wait child {process_name:?}"))
     133            0 :         .unwrap();
     134            0 :     });
     135              : 
     136            0 :     for retries in 0..retries {
     137            0 :         match process_started(pid, pid_file_to_check, &process_status_check).await {
     138              :             Ok(true) => {
     139            0 :                 println!("\n{process_name} started and passed status check, pid: {pid}");
     140            0 :                 // leak the child process, it'll outlive this neon_local invocation
     141            0 :                 drop(scopeguard::ScopeGuard::into_inner(spawned_process));
     142            0 :                 return Ok(());
     143              :             }
     144              :             Ok(false) => {
     145            0 :                 if retries == NOTICE_AFTER_RETRIES {
     146            0 :                     // The process is taking a long time to start up. Keep waiting, but
     147            0 :                     // print a message
     148            0 :                     print!("\n{process_name} has not started yet, continuing to wait");
     149            0 :                 }
     150            0 :                 if retries % DOT_EVERY_RETRIES == 0 {
     151            0 :                     print!(".");
     152            0 :                     io::stdout().flush().unwrap();
     153            0 :                 }
     154            0 :                 tokio::time::sleep(RETRY_INTERVAL).await;
     155              :             }
     156            0 :             Err(e) => {
     157            0 :                 println!("error starting process {process_name:?}: {e:#}");
     158            0 :                 return Err(e);
     159              :             }
     160              :         }
     161              :     }
     162            0 :     println!();
     163            0 :     anyhow::bail!(format!(
     164            0 :         "{} did not start+pass status checks within {:?} seconds",
     165            0 :         process_name, retry_timeout
     166            0 :     ));
     167            0 : }
     168              : 
     169              : /// Stops the process, using the pid file given. Returns Ok also if the process is already not running.
     170            0 : pub fn stop_process(
     171            0 :     immediate: bool,
     172            0 :     process_name: &str,
     173            0 :     pid_file: &Utf8Path,
     174            0 : ) -> anyhow::Result<()> {
     175            0 :     let pid = match pid_file::read(pid_file)
     176            0 :         .with_context(|| format!("read pid_file {pid_file:?}"))?
     177              :     {
     178              :         PidFileRead::NotExist => {
     179            0 :             println!("{process_name} is already stopped: no pid file present at {pid_file:?}");
     180            0 :             return Ok(());
     181              :         }
     182              :         PidFileRead::NotHeldByAnyProcess(_) => {
     183              :             // Don't try to kill according to file contents beacuse the pid might have been re-used by another process.
     184              :             // Don't delete the file either, it can race with new pid file creation.
     185              :             // Read `pid_file` module comment for details.
     186            0 :             println!(
     187            0 :                 "No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: {pid_file:?}"
     188            0 :             );
     189            0 :             return Ok(());
     190              :         }
     191            0 :         PidFileRead::LockedByOtherProcess(pid) => pid,
     192              :     };
     193              :     // XXX the pid could become invalid (and recycled) at any time before the kill() below.
     194              : 
     195              :     // send signal
     196            0 :     let sig = if immediate {
     197            0 :         print!("Stopping {process_name} with pid {pid} immediately..");
     198            0 :         Signal::SIGQUIT
     199              :     } else {
     200            0 :         print!("Stopping {process_name} with pid {pid} gracefully..");
     201            0 :         Signal::SIGTERM
     202              :     };
     203            0 :     io::stdout().flush().unwrap();
     204            0 :     match kill(pid, sig) {
     205            0 :         Ok(()) => (),
     206              :         Err(Errno::ESRCH) => {
     207              :             // Again, don't delete the pid file. The unlink can race with a new pid file being created.
     208            0 :             println!(
     209            0 :                 "{process_name} with pid {pid} does not exist, but a pid file {pid_file:?} was found. Likely the pid got recycled. Lucky we didn't harm anyone."
     210            0 :             );
     211            0 :             return Ok(());
     212              :         }
     213            0 :         Err(e) => anyhow::bail!("Failed to send signal to {process_name} with pid {pid}: {e}"),
     214              :     }
     215              : 
     216              :     // Wait until process is gone
     217            0 :     wait_until_stopped(process_name, pid)?;
     218            0 :     Ok(())
     219            0 : }
     220              : 
     221            0 : pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> {
     222            0 :     for retries in 0..STOP_RETRIES {
     223            0 :         match process_has_stopped(pid) {
     224              :             Ok(true) => {
     225            0 :                 println!("\n{process_name} stopped");
     226            0 :                 return Ok(());
     227              :             }
     228              :             Ok(false) => {
     229            0 :                 if retries == NOTICE_AFTER_RETRIES {
     230            0 :                     // The process is taking a long time to start up. Keep waiting, but
     231            0 :                     // print a message
     232            0 :                     print!("\n{process_name} has not stopped yet, continuing to wait");
     233            0 :                 }
     234            0 :                 if retries % DOT_EVERY_RETRIES == 0 {
     235            0 :                     print!(".");
     236            0 :                     io::stdout().flush().unwrap();
     237            0 :                 }
     238            0 :                 thread::sleep(RETRY_INTERVAL);
     239              :             }
     240            0 :             Err(e) => {
     241            0 :                 println!("{process_name} with pid {pid} failed to stop: {e:#}");
     242            0 :                 return Err(e);
     243              :             }
     244              :         }
     245              :     }
     246            0 :     println!();
     247            0 :     anyhow::bail!(format!(
     248            0 :         "{} with pid {} did not stop in {:?} seconds",
     249            0 :         process_name, pid, STOP_RETRY_TIMEOUT
     250            0 :     ));
     251            0 : }
     252              : 
     253            0 : fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
     254            0 :     // If RUST_BACKTRACE is set, pass it through. But if it's not set, default
     255            0 :     // to RUST_BACKTRACE=1.
     256            0 :     let backtrace_setting = std::env::var_os("RUST_BACKTRACE");
     257            0 :     let backtrace_setting = backtrace_setting
     258            0 :         .as_deref()
     259            0 :         .unwrap_or_else(|| OsStr::new("1"));
     260            0 : 
     261            0 :     let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
     262              : 
     263              :     // Pass through these environment variables to the command
     264            0 :     for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
     265            0 :         if let Some(val) = std::env::var_os(var) {
     266            0 :             filled_cmd = filled_cmd.env(var, val);
     267            0 :         }
     268              :     }
     269              : 
     270            0 :     filled_cmd
     271            0 : }
     272              : 
     273            0 : fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
     274            0 :     for env_key in [
     275              :         "AWS_ACCESS_KEY_ID",
     276            0 :         "AWS_SECRET_ACCESS_KEY",
     277            0 :         "AWS_PROFILE",
     278            0 :         // HOME is needed in combination with `AWS_PROFILE` to pick up the SSO sessions.
     279            0 :         "HOME",
     280            0 :         "AZURE_STORAGE_ACCOUNT",
     281            0 :         "AZURE_STORAGE_ACCESS_KEY",
     282              :     ] {
     283            0 :         if let Ok(value) = std::env::var(env_key) {
     284            0 :             cmd = cmd.env(env_key, value);
     285            0 :         }
     286              :     }
     287            0 :     cmd
     288            0 : }
     289              : 
     290            0 : fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command {
     291            0 :     for (var, val) in std::env::vars() {
     292            0 :         if var.starts_with("NEON_") {
     293            0 :             cmd = cmd.env(var, val);
     294            0 :         }
     295              :     }
     296            0 :     cmd
     297            0 : }
     298              : 
     299              : /// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
     300              : /// 1. Claims a pidfile with a fcntl lock on it and
     301              : /// 2. Sets up the pidfile's file descriptor so that it (and the lock)
     302              : ///    will remain held until the cmd exits.
     303            0 : fn pre_exec_create_pidfile<P>(cmd: &mut Command, path: P) -> &mut Command
     304            0 : where
     305            0 :     P: Into<Utf8PathBuf>,
     306            0 : {
     307            0 :     let path: Utf8PathBuf = path.into();
     308            0 :     // SAFETY:
     309            0 :     // pre_exec is marked unsafe because it runs between fork and exec.
     310            0 :     // Why is that dangerous in various ways?
     311            0 :     // Long answer:  https://github.com/rust-lang/rust/issues/39575
     312            0 :     // Short answer: in a multi-threaded program, other threads may have
     313            0 :     // been inside of critical sections at the time of fork. In the
     314            0 :     // original process, that was allright, assuming they protected
     315            0 :     // the critical sections appropriately, e.g., through locks.
     316            0 :     // Fork adds another process to the mix that
     317            0 :     //   1. Has a single thread T
     318            0 :     //   2. In an exact copy of the address space at the time of fork.
     319            0 :     // A variety of problems scan occur now:
     320            0 :     //   1. T tries to grab a lock that was locked at the time of fork.
     321            0 :     //      It will wait forever since in its address space, the lock
     322            0 :     //      is in state 'taken' but the thread that would unlock it is
     323            0 :     //      not there.
     324            0 :     //   2. A rust object that represented some external resource in the
     325            0 :     //      parent now got implicitly copied by the fork, even though
     326            0 :     //      the object's type is not `Copy`. The parent program may use
     327            0 :     //      non-copyability as way to enforce unique ownership of an
     328            0 :     //      external resource in the typesystem. The fork breaks that
     329            0 :     //      assumption, as now both parent and child process have an
     330            0 :     //      owned instance of the object that represents the same
     331            0 :     //      underlying resource.
     332            0 :     // While these seem like niche problems, (1) in particular is
     333            0 :     // highly relevant. For example, `malloc()` may grab a mutex internally,
     334            0 :     // and so, if we forked while another thread was mallocing' and our
     335            0 :     // pre_exec closure allocates as well, it will block on the malloc
     336            0 :     // mutex forever
     337            0 :     //
     338            0 :     // The proper solution is to only use C library functions that are marked
     339            0 :     // "async-signal-safe": https://man7.org/linux/man-pages/man7/signal-safety.7.html
     340            0 :     //
     341            0 :     // With this specific pre_exec() closure, the non-error path doesn't allocate.
     342            0 :     // The error path uses `anyhow`, and hence does allocate.
     343            0 :     // We take our chances there, hoping that any potential disaster is constrained
     344            0 :     // to the child process (e.g., malloc has no state ourside of the child process).
     345            0 :     // Last, `expect` prints to stderr, and stdio is not async-signal-safe.
     346            0 :     // Again, we take our chances, making the same assumptions as for malloc.
     347            0 :     unsafe {
     348            0 :         cmd.pre_exec(move || {
     349            0 :             let file = pid_file::claim_for_current_process(&path).expect("claim pid file");
     350            0 :             // Remove the FD_CLOEXEC flag on the pidfile descriptor so that the pidfile
     351            0 :             // remains locked after exec.
     352            0 :             nix::fcntl::fcntl(file.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
     353            0 :                 .expect("remove FD_CLOEXEC");
     354            0 :             // Don't run drop(file), it would close the file before we actually exec.
     355            0 :             std::mem::forget(file);
     356            0 :             Ok(())
     357            0 :         });
     358            0 :     }
     359            0 :     cmd
     360            0 : }
     361              : 
     362            0 : async fn process_started<F, Fut>(
     363            0 :     pid: Pid,
     364            0 :     pid_file_to_check: &Utf8Path,
     365            0 :     status_check: &F,
     366            0 : ) -> anyhow::Result<bool>
     367            0 : where
     368            0 :     F: Fn() -> Fut,
     369            0 :     Fut: std::future::Future<Output = anyhow::Result<bool>>,
     370            0 : {
     371            0 :     match status_check().await {
     372            0 :         Ok(true) => match pid_file::read(pid_file_to_check)? {
     373            0 :             PidFileRead::NotExist => Ok(false),
     374            0 :             PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid),
     375            0 :             PidFileRead::NotHeldByAnyProcess(_) => Ok(false),
     376              :         },
     377            0 :         Ok(false) => Ok(false),
     378            0 :         Err(e) => anyhow::bail!("process failed to start: {e}"),
     379              :     }
     380            0 : }
     381              : 
     382            0 : pub(crate) fn process_has_stopped(pid: Pid) -> anyhow::Result<bool> {
     383            0 :     match kill(pid, None) {
     384              :         // Process exists, keep waiting
     385            0 :         Ok(_) => Ok(false),
     386              :         // Process not found, we're done
     387            0 :         Err(Errno::ESRCH) => Ok(true),
     388            0 :         Err(err) => anyhow::bail!("Failed to send signal to process with pid {pid}: {err}"),
     389              :     }
     390            0 : }
        

Generated by: LCOV version 2.1-beta