LCOV - code coverage report
Current view: top level - control_plane/src - background_process.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 79.8 % 243 194
Test Date: 2024-02-07 07:37:29 Functions: 33.8 % 77 26

            Line data    Source code
       1              : //! Spawns and kills background processes that are needed by Neon CLI.
       2              : //! Applies common set-up such as log and pid files (if needed) to every process.
       3              : //!
       4              : //! Neon CLI does not run in background, so it needs to store the information about
       5              : //! spawned processes, which it does in this module.
       6              : //! We do that by storing the pid of the process in the "${process_name}.pid" file.
       7              : //! The pid file can be created by the process itself
       8              : //! (Neon storage binaries do that and also ensure that a lock is taken onto that file)
       9              : //! or we create such file after starting the process
      10              : //! (non-Neon binaries don't necessarily follow our pidfile conventions).
      11              : //! The pid stored in the file is later used to stop the service.
      12              : //!
      13              : //! See the [`lock_file`](utils::lock_file) module for more info.
      14              : 
      15              : use std::ffi::OsStr;
      16              : use std::io::Write;
      17              : use std::os::unix::prelude::AsRawFd;
      18              : use std::os::unix::process::CommandExt;
      19              : use std::path::Path;
      20              : use std::process::Command;
      21              : use std::time::Duration;
      22              : use std::{fs, io, thread};
      23              : 
      24              : use anyhow::Context;
      25              : use camino::{Utf8Path, Utf8PathBuf};
      26              : use nix::errno::Errno;
      27              : use nix::fcntl::{FcntlArg, FdFlag};
      28              : use nix::sys::signal::{kill, Signal};
      29              : use nix::unistd::Pid;
      30              : use utils::pid_file::{self, PidFileRead};
      31              : 
      32              : // These constants control the loop used to poll for process start / stop.
      33              : //
      34              : // The loop waits for at most 10 seconds, polling every 100 ms.
      35              : // Once a second, it prints a dot ("."), to give the user an indication that
      36              : // it's waiting. If the process hasn't started/stopped after 5 seconds,
      37              : // it prints a notice that it's taking long, but keeps waiting.
      38              : //
      39              : const RETRY_UNTIL_SECS: u64 = 10;
      40              : const RETRIES: u64 = (RETRY_UNTIL_SECS * 1000) / RETRY_INTERVAL_MILLIS;
      41              : const RETRY_INTERVAL_MILLIS: u64 = 100;
      42              : const DOT_EVERY_RETRIES: u64 = 10;
      43              : const NOTICE_AFTER_RETRIES: u64 = 50;
      44              : 
      45              : /// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates
      46              : /// it itself.
      47              : pub enum InitialPidFile {
      48              :     /// Create a pidfile, to allow future CLI invocations to manipulate the process.
      49              :     Create(Utf8PathBuf),
      50              :     /// The process will create the pidfile itself, need to wait for that event.
      51              :     Expect(Utf8PathBuf),
      52              : }
      53              : 
      54              : /// Start a background child process using the parameters given.
      55         1843 : pub async fn start_process<F, Fut, AI, A, EI>(
      56         1843 :     process_name: &str,
      57         1843 :     datadir: &Path,
      58         1843 :     command: &Path,
      59         1843 :     args: AI,
      60         1843 :     envs: EI,
      61         1843 :     initial_pid_file: InitialPidFile,
      62         1843 :     process_status_check: F,
      63         1843 : ) -> anyhow::Result<()>
      64         1843 : where
      65         1843 :     F: Fn() -> Fut,
      66         1843 :     Fut: std::future::Future<Output = anyhow::Result<bool>>,
      67         1843 :     AI: IntoIterator<Item = A>,
      68         1843 :     A: AsRef<OsStr>,
      69         1843 :     // Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
      70         1843 :     EI: IntoIterator<Item = (String, String)>,
      71         1843 : {
      72         1843 :     let log_path = datadir.join(format!("{process_name}.log"));
      73         1843 :     let process_log_file = fs::OpenOptions::new()
      74         1843 :         .create(true)
      75         1843 :         .write(true)
      76         1843 :         .append(true)
      77         1843 :         .open(&log_path)
      78         1843 :         .with_context(|| {
      79            0 :             format!("Could not open {process_name} log file {log_path:?} for writing")
      80         1843 :         })?;
      81         1843 :     let same_file_for_stderr = process_log_file.try_clone().with_context(|| {
      82            0 :         format!("Could not reuse {process_name} log file {log_path:?} for writing stderr")
      83         1843 :     })?;
      84              : 
      85         1843 :     let mut command = Command::new(command);
      86         1843 :     let background_command = command
      87         1843 :         .stdout(process_log_file)
      88         1843 :         .stderr(same_file_for_stderr)
      89         1843 :         .args(args);
      90         1843 :     let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
      91         1843 :     filled_cmd.envs(envs);
      92              : 
      93         1843 :     let pid_file_to_check = match &initial_pid_file {
      94          725 :         InitialPidFile::Create(path) => {
      95          725 :             pre_exec_create_pidfile(filled_cmd, path);
      96          725 :             path
      97              :         }
      98         1118 :         InitialPidFile::Expect(path) => path,
      99              :     };
     100              : 
     101         1843 :     let spawned_process = filled_cmd.spawn().with_context(|| {
     102            0 :         format!("Could not spawn {process_name}, see console output and log files for details.")
     103         1843 :     })?;
     104         1843 :     let pid = spawned_process.id();
     105         1843 :     let pid = Pid::from_raw(
     106         1843 :         i32::try_from(pid)
     107         1843 :             .with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?,
     108              :     );
     109              :     // set up a scopeguard to kill & wait for the child in case we panic or bail below
     110         1843 :     let spawned_process = scopeguard::guard(spawned_process, |mut spawned_process| {
     111            0 :         println!("SIGKILL & wait the started process");
     112            0 :         (|| {
     113            0 :             // TODO: use another signal that can be caught by the child so it can clean up any children it spawned (e..g, walredo).
     114            0 :             spawned_process.kill().context("SIGKILL child")?;
     115            0 :             spawned_process.wait().context("wait() for child process")?;
     116            0 :             anyhow::Ok(())
     117            0 :         })()
     118            0 :         .with_context(|| format!("scopeguard kill&wait child {process_name:?}"))
     119            0 :         .unwrap();
     120         1843 :     });
     121              : 
     122         3717 :     for retries in 0..RETRIES {
     123         8273 :         match process_started(pid, pid_file_to_check, &process_status_check).await {
     124              :             Ok(true) => {
     125         1843 :                 println!("\n{process_name} started and passed status check, pid: {pid}");
     126         1843 :                 // leak the child process, it'll outlive this neon_local invocation
     127         1843 :                 drop(scopeguard::ScopeGuard::into_inner(spawned_process));
     128         1843 :                 return Ok(());
     129              :             }
     130              :             Ok(false) => {
     131         1874 :                 if retries == NOTICE_AFTER_RETRIES {
     132            0 :                     // The process is taking a long time to start up. Keep waiting, but
     133            0 :                     // print a message
     134            0 :                     print!("\n{process_name} has not started yet, continuing to wait");
     135         1874 :                 }
     136         1874 :                 if retries % DOT_EVERY_RETRIES == 0 {
     137         1842 :                     print!(".");
     138         1842 :                     io::stdout().flush().unwrap();
     139         1842 :                 }
     140         1874 :                 thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
     141              :             }
     142            0 :             Err(e) => {
     143            0 :                 println!("error starting process {process_name:?}: {e:#}");
     144            0 :                 return Err(e);
     145              :             }
     146              :         }
     147              :     }
     148            0 :     println!();
     149            0 :     anyhow::bail!(
     150            0 :         "{process_name} did not start+pass status checks within {RETRY_UNTIL_SECS} seconds"
     151            0 :     );
     152         1843 : }
     153              : 
     154              : /// Stops the process, using the pid file given. Returns Ok also if the process is already not running.
     155         1521 : pub fn stop_process(
     156         1521 :     immediate: bool,
     157         1521 :     process_name: &str,
     158         1521 :     pid_file: &Utf8Path,
     159         1521 : ) -> anyhow::Result<()> {
     160         1521 :     let pid = match pid_file::read(pid_file)
     161         1521 :         .with_context(|| format!("read pid_file {pid_file:?}"))?
     162              :     {
     163              :         PidFileRead::NotExist => {
     164            0 :             println!("{process_name} is already stopped: no pid file present at {pid_file:?}");
     165            0 :             return Ok(());
     166              :         }
     167              :         PidFileRead::NotHeldByAnyProcess(_) => {
     168              :             // Don't try to kill according to file contents beacuse the pid might have been re-used by another process.
     169              :             // Don't delete the file either, it can race with new pid file creation.
     170              :             // Read `pid_file` module comment for details.
     171           45 :             println!(
     172           45 :                 "No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: {pid_file:?}"
     173           45 :             );
     174           45 :             return Ok(());
     175              :         }
     176         1476 :         PidFileRead::LockedByOtherProcess(pid) => pid,
     177              :     };
     178              :     // XXX the pid could become invalid (and recycled) at any time before the kill() below.
     179              : 
     180              :     // send signal
     181         1476 :     let sig = if immediate {
     182         1177 :         print!("Stopping {process_name} with pid {pid} immediately..");
     183         1177 :         Signal::SIGQUIT
     184              :     } else {
     185          299 :         print!("Stopping {process_name} with pid {pid} gracefully..");
     186          299 :         Signal::SIGTERM
     187              :     };
     188         1476 :     io::stdout().flush().unwrap();
     189         1476 :     match kill(pid, sig) {
     190         1476 :         Ok(()) => (),
     191              :         Err(Errno::ESRCH) => {
     192              :             // Again, don't delete the pid file. The unlink can race with a new pid file being created.
     193            0 :             println!(
     194            0 :                 "{process_name} with pid {pid} does not exist, but a pid file {pid_file:?} was found. Likely the pid got recycled. Lucky we didn't harm anyone."
     195            0 :             );
     196            0 :             return Ok(());
     197              :         }
     198            0 :         Err(e) => anyhow::bail!("Failed to send signal to {process_name} with pid {pid}: {e}"),
     199              :     }
     200              : 
     201              :     // Wait until process is gone
     202         1476 :     wait_until_stopped(process_name, pid)?;
     203         1476 :     Ok(())
     204         1521 : }
     205              : 
     206         2052 : pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> {
     207         3680 :     for retries in 0..RETRIES {
     208         3680 :         match process_has_stopped(pid) {
     209              :             Ok(true) => {
     210         2052 :                 println!("\n{process_name} stopped");
     211         2052 :                 return Ok(());
     212              :             }
     213              :             Ok(false) => {
     214         1628 :                 if retries == NOTICE_AFTER_RETRIES {
     215            0 :                     // The process is taking a long time to start up. Keep waiting, but
     216            0 :                     // print a message
     217            0 :                     print!("\n{process_name} has not stopped yet, continuing to wait");
     218         1628 :                 }
     219         1628 :                 if retries % DOT_EVERY_RETRIES == 0 {
     220         1491 :                     print!(".");
     221         1491 :                     io::stdout().flush().unwrap();
     222         1491 :                 }
     223         1628 :                 thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
     224              :             }
     225            0 :             Err(e) => {
     226            0 :                 println!("{process_name} with pid {pid} failed to stop: {e:#}");
     227            0 :                 return Err(e);
     228              :             }
     229              :         }
     230              :     }
     231            0 :     println!();
     232            0 :     anyhow::bail!("{process_name} with pid {pid} did not stop in {RETRY_UNTIL_SECS} seconds");
     233         2052 : }
     234              : 
     235         1843 : fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
     236         1843 :     // If RUST_BACKTRACE is set, pass it through. But if it's not set, default
     237         1843 :     // to RUST_BACKTRACE=1.
     238         1843 :     let backtrace_setting = std::env::var_os("RUST_BACKTRACE");
     239         1843 :     let backtrace_setting = backtrace_setting
     240         1843 :         .as_deref()
     241         1843 :         .unwrap_or_else(|| OsStr::new("1"));
     242         1843 : 
     243         1843 :     let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
     244              : 
     245              :     // Pass through these environment variables to the command
     246         5529 :     for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
     247         5529 :         if let Some(val) = std::env::var_os(var) {
     248         1851 :             filled_cmd = filled_cmd.env(var, val);
     249         3678 :         }
     250              :     }
     251              : 
     252         1843 :     filled_cmd
     253         1843 : }
     254              : 
     255         1843 : fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
     256        11058 :     for env_key in [
     257              :         "AWS_ACCESS_KEY_ID",
     258         1843 :         "AWS_SECRET_ACCESS_KEY",
     259         1843 :         "AWS_PROFILE",
     260         1843 :         // HOME is needed in combination with `AWS_PROFILE` to pick up the SSO sessions.
     261         1843 :         "HOME",
     262         1843 :         "AZURE_STORAGE_ACCOUNT",
     263         1843 :         "AZURE_STORAGE_ACCESS_KEY",
     264              :     ] {
     265        11058 :         if let Ok(value) = std::env::var(env_key) {
     266         5529 :             cmd = cmd.env(env_key, value);
     267         5529 :         }
     268              :     }
     269         1843 :     cmd
     270         1843 : }
     271              : 
     272              : /// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
     273              : /// 1. Claims a pidfile with a fcntl lock on it and
     274              : /// 2. Sets up the pidfile's file descriptor so that it (and the lock)
     275              : ///    will remain held until the cmd exits.
     276          725 : fn pre_exec_create_pidfile<P>(cmd: &mut Command, path: P) -> &mut Command
     277          725 : where
     278          725 :     P: Into<Utf8PathBuf>,
     279          725 : {
     280          725 :     let path: Utf8PathBuf = path.into();
     281          725 :     // SAFETY:
     282          725 :     // pre_exec is marked unsafe because it runs between fork and exec.
     283          725 :     // Why is that dangerous in various ways?
     284          725 :     // Long answer:  https://github.com/rust-lang/rust/issues/39575
     285          725 :     // Short answer: in a multi-threaded program, other threads may have
     286          725 :     // been inside of critical sections at the time of fork. In the
     287          725 :     // original process, that was allright, assuming they protected
     288          725 :     // the critical sections appropriately, e.g., through locks.
     289          725 :     // Fork adds another process to the mix that
     290          725 :     //   1. Has a single thread T
     291          725 :     //   2. In an exact copy of the address space at the time of fork.
     292          725 :     // A variety of problems scan occur now:
     293          725 :     //   1. T tries to grab a lock that was locked at the time of fork.
     294          725 :     //      It will wait forever since in its address space, the lock
     295          725 :     //      is in state 'taken' but the thread that would unlock it is
     296          725 :     //      not there.
     297          725 :     //   2. A rust object that represented some external resource in the
     298          725 :     //      parent now got implicitly copied by the the fork, even though
     299          725 :     //      the object's type is not `Copy`. The parent program may use
     300          725 :     //      non-copyability as way to enforce unique ownership of an
     301          725 :     //      external resource in the typesystem. The fork breaks that
     302          725 :     //      assumption, as now both parent and child process have an
     303          725 :     //      owned instance of the object that represents the same
     304          725 :     //      underlying resource.
     305          725 :     // While these seem like niche problems, (1) in particular is
     306          725 :     // highly relevant. For example, `malloc()` may grab a mutex internally,
     307          725 :     // and so, if we forked while another thread was mallocing' and our
     308          725 :     // pre_exec closure allocates as well, it will block on the malloc
     309          725 :     // mutex forever
     310          725 :     //
     311          725 :     // The proper solution is to only use C library functions that are marked
     312          725 :     // "async-signal-safe": https://man7.org/linux/man-pages/man7/signal-safety.7.html
     313          725 :     //
     314          725 :     // With this specific pre_exec() closure, the non-error path doesn't allocate.
     315          725 :     // The error path uses `anyhow`, and hence does allocate.
     316          725 :     // We take our chances there, hoping that any potential disaster is constrained
     317          725 :     // to the child process (e.g., malloc has no state ourside of the child process).
     318          725 :     // Last, `expect` prints to stderr, and stdio is not async-signal-safe.
     319          725 :     // Again, we take our chances, making the same assumptions as for malloc.
     320          725 :     unsafe {
     321          725 :         cmd.pre_exec(move || {
     322            0 :             let file = pid_file::claim_for_current_process(&path).expect("claim pid file");
     323            0 :             // Remove the FD_CLOEXEC flag on the pidfile descriptor so that the pidfile
     324            0 :             // remains locked after exec.
     325            0 :             nix::fcntl::fcntl(file.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
     326            0 :                 .expect("remove FD_CLOEXEC");
     327            0 :             // Don't run drop(file), it would close the file before we actually exec.
     328            0 :             std::mem::forget(file);
     329            0 :             Ok(())
     330          725 :         });
     331          725 :     }
     332          725 :     cmd
     333          725 : }
     334              : 
     335         3717 : async fn process_started<F, Fut>(
     336         3717 :     pid: Pid,
     337         3717 :     pid_file_to_check: &Utf8Path,
     338         3717 :     status_check: &F,
     339         3717 : ) -> anyhow::Result<bool>
     340         3717 : where
     341         3717 :     F: Fn() -> Fut,
     342         3717 :     Fut: std::future::Future<Output = anyhow::Result<bool>>,
     343         3717 : {
     344         8273 :     match status_check().await {
     345         1843 :         Ok(true) => match pid_file::read(pid_file_to_check)? {
     346            0 :             PidFileRead::NotExist => Ok(false),
     347         1843 :             PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid),
     348            0 :             PidFileRead::NotHeldByAnyProcess(_) => Ok(false),
     349              :         },
     350         1874 :         Ok(false) => Ok(false),
     351            0 :         Err(e) => anyhow::bail!("process failed to start: {e}"),
     352              :     }
     353         3717 : }
     354              : 
     355         3680 : fn process_has_stopped(pid: Pid) -> anyhow::Result<bool> {
     356         3680 :     match kill(pid, None) {
     357              :         // Process exists, keep waiting
     358         1628 :         Ok(_) => Ok(false),
     359              :         // Process not found, we're done
     360         2052 :         Err(Errno::ESRCH) => Ok(true),
     361            0 :         Err(err) => anyhow::bail!("Failed to send signal to process with pid {pid}: {err}"),
     362              :     }
     363         3680 : }
        

Generated by: LCOV version 2.1-beta