LCOV - code coverage report
Current view: top level - control_plane/src - background_process.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 81.1 % 222 180
Test Date: 2023-09-06 10:18:01 Functions: 44.4 % 27 12

            Line data    Source code
       1              : //! Spawns and kills background processes that are needed by Neon CLI.
       2              : //! Applies common set-up such as log and pid files (if needed) to every process.
       3              : //!
       4              : //! Neon CLI does not run in background, so it needs to store the information about
       5              : //! spawned processes, which it does in this module.
       6              : //! We do that by storing the pid of the process in the "${process_name}.pid" file.
       7              : //! The pid file can be created by the process itself
       8              : //! (Neon storage binaries do that and also ensure that a lock is taken onto that file)
       9              : //! or we create such file after starting the process
      10              : //! (non-Neon binaries don't necessarily follow our pidfile conventions).
      11              : //! The pid stored in the file is later used to stop the service.
      12              : //!
      13              : //! See the [`lock_file`](utils::lock_file) module for more info.
      14              : 
      15              : use std::ffi::OsStr;
      16              : use std::io::Write;
      17              : use std::os::unix::prelude::AsRawFd;
      18              : use std::os::unix::process::CommandExt;
      19              : use std::path::{Path, PathBuf};
      20              : use std::process::{Child, Command};
      21              : use std::time::Duration;
      22              : use std::{fs, io, thread};
      23              : 
      24              : use anyhow::Context;
      25              : use nix::errno::Errno;
      26              : use nix::fcntl::{FcntlArg, FdFlag};
      27              : use nix::sys::signal::{kill, Signal};
      28              : use nix::unistd::Pid;
      29              : use utils::pid_file::{self, PidFileRead};
      30              : 
      31              : // These constants control the loop used to poll for process start / stop.
      32              : //
      33              : // The loop waits for at most 10 seconds, polling every 100 ms.
      34              : // Once a second, it prints a dot ("."), to give the user an indication that
      35              : // it's waiting. If the process hasn't started/stopped after 5 seconds,
      36              : // it prints a notice that it's taking long, but keeps waiting.
      37              : //
      38              : const RETRY_UNTIL_SECS: u64 = 10;
      39              : const RETRIES: u64 = (RETRY_UNTIL_SECS * 1000) / RETRY_INTERVAL_MILLIS;
      40              : const RETRY_INTERVAL_MILLIS: u64 = 100;
      41              : const DOT_EVERY_RETRIES: u64 = 10;
      42              : const NOTICE_AFTER_RETRIES: u64 = 50;
      43              : 
      44              : /// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates
      45              : /// it itself.
      46              : pub enum InitialPidFile<'t> {
      47              :     /// Create a pidfile, to allow future CLI invocations to manipulate the process.
      48              :     Create(&'t Path),
      49              :     /// The process will create the pidfile itself, need to wait for that event.
      50              :     Expect(&'t Path),
      51              : }
      52              : 
      53              : /// Start a background child process using the parameters given.
      54         1096 : pub fn start_process<F, AI, A, EI>(
      55         1096 :     process_name: &str,
      56         1096 :     datadir: &Path,
      57         1096 :     command: &Path,
      58         1096 :     args: AI,
      59         1096 :     envs: EI,
      60         1096 :     initial_pid_file: InitialPidFile,
      61         1096 :     process_status_check: F,
      62         1096 : ) -> anyhow::Result<Child>
      63         1096 : where
      64         1096 :     F: Fn() -> anyhow::Result<bool>,
      65         1096 :     AI: IntoIterator<Item = A>,
      66         1096 :     A: AsRef<OsStr>,
      67         1096 :     // Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
      68         1096 :     EI: IntoIterator<Item = (String, String)>,
      69         1096 : {
      70         1096 :     let log_path = datadir.join(format!("{process_name}.log"));
      71         1096 :     let process_log_file = fs::OpenOptions::new()
      72         1096 :         .create(true)
      73         1096 :         .write(true)
      74         1096 :         .append(true)
      75         1096 :         .open(&log_path)
      76         1096 :         .with_context(|| {
      77            0 :             format!("Could not open {process_name} log file {log_path:?} for writing")
      78         1096 :         })?;
      79         1096 :     let same_file_for_stderr = process_log_file.try_clone().with_context(|| {
      80            0 :         format!("Could not reuse {process_name} log file {log_path:?} for writing stderr")
      81         1096 :     })?;
      82              : 
      83         1096 :     let mut command = Command::new(command);
      84         1096 :     let background_command = command
      85         1096 :         .stdout(process_log_file)
      86         1096 :         .stderr(same_file_for_stderr)
      87         1096 :         .args(args);
      88         1096 :     let filled_cmd = fill_aws_secrets_vars(fill_rust_env_vars(background_command));
      89         1096 :     filled_cmd.envs(envs);
      90              : 
      91         1096 :     let pid_file_to_check = match initial_pid_file {
      92            4 :         InitialPidFile::Create(path) => {
      93            4 :             pre_exec_create_pidfile(filled_cmd, path);
      94            4 :             path
      95              :         }
      96         1092 :         InitialPidFile::Expect(path) => path,
      97              :     };
      98              : 
      99         1096 :     let mut spawned_process = filled_cmd.spawn().with_context(|| {
     100            0 :         format!("Could not spawn {process_name}, see console output and log files for details.")
     101         1096 :     })?;
     102         1096 :     let pid = spawned_process.id();
     103         1096 :     let pid = Pid::from_raw(
     104         1096 :         i32::try_from(pid)
     105         1096 :             .with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?,
     106              :     );
     107              : 
     108         2205 :     for retries in 0..RETRIES {
     109         2205 :         match process_started(pid, Some(pid_file_to_check), &process_status_check) {
     110              :             Ok(true) => {
     111         1096 :                 println!("\n{process_name} started, pid: {pid}");
     112         1096 :                 return Ok(spawned_process);
     113              :             }
     114              :             Ok(false) => {
     115         1109 :                 if retries == NOTICE_AFTER_RETRIES {
     116            0 :                     // The process is taking a long time to start up. Keep waiting, but
     117            0 :                     // print a message
     118            0 :                     print!("\n{process_name} has not started yet, continuing to wait");
     119         1109 :                 }
     120         1109 :                 if retries % DOT_EVERY_RETRIES == 0 {
     121         1093 :                     print!(".");
     122         1093 :                     io::stdout().flush().unwrap();
     123         1093 :                 }
     124         1109 :                 thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
     125              :             }
     126            0 :             Err(e) => {
     127            0 :                 println!("{process_name} failed to start: {e:#}");
     128            0 :                 if let Err(e) = spawned_process.kill() {
     129            0 :                     println!("Could not stop {process_name} subprocess: {e:#}")
     130            0 :                 };
     131            0 :                 return Err(e);
     132              :             }
     133              :         }
     134              :     }
     135            0 :     println!();
     136            0 :     anyhow::bail!("{process_name} did not start in {RETRY_UNTIL_SECS} seconds");
     137         1096 : }
     138              : 
     139              : /// Stops the process, using the pid file given. Returns Ok also if the process is already not running.
     140         1128 : pub fn stop_process(immediate: bool, process_name: &str, pid_file: &Path) -> anyhow::Result<()> {
     141         1128 :     let pid = match pid_file::read(pid_file)
     142         1128 :         .with_context(|| format!("read pid_file {pid_file:?}"))?
     143              :     {
     144              :         PidFileRead::NotExist => {
     145            0 :             println!("{process_name} is already stopped: no pid file present at {pid_file:?}");
     146            0 :             return Ok(());
     147              :         }
     148              :         PidFileRead::NotHeldByAnyProcess(_) => {
     149              :             // Don't try to kill according to file contents beacuse the pid might have been re-used by another process.
     150              :             // Don't delete the file either, it can race with new pid file creation.
     151              :             // Read `pid_file` module comment for details.
     152           38 :             println!(
     153           38 :                 "No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: {pid_file:?}"
     154           38 :             );
     155           38 :             return Ok(());
     156              :         }
     157         1090 :         PidFileRead::LockedByOtherProcess(pid) => pid,
     158              :     };
     159              :     // XXX the pid could become invalid (and recycled) at any time before the kill() below.
     160              : 
     161              :     // send signal
     162         1090 :     let sig = if immediate {
     163          836 :         print!("Stopping {process_name} with pid {pid} immediately..");
     164          836 :         Signal::SIGQUIT
     165              :     } else {
     166          254 :         print!("Stopping {process_name} with pid {pid} gracefully..");
     167          254 :         Signal::SIGTERM
     168              :     };
     169         1090 :     io::stdout().flush().unwrap();
     170         1090 :     match kill(pid, sig) {
     171         1090 :         Ok(()) => (),
     172              :         Err(Errno::ESRCH) => {
     173              :             // Again, don't delete the pid file. The unlink can race with a new pid file being created.
     174            0 :             println!(
     175            0 :                 "{process_name} with pid {pid} does not exist, but a pid file {pid_file:?} was found. Likely the pid got recycled. Lucky we didn't harm anyone."
     176            0 :             );
     177            0 :             return Ok(());
     178              :         }
     179            0 :         Err(e) => anyhow::bail!("Failed to send signal to {process_name} with pid {pid}: {e}"),
     180              :     }
     181              : 
     182              :     // Wait until process is gone
     183         1090 :     wait_until_stopped(process_name, pid)?;
     184         1090 :     Ok(())
     185         1128 : }
     186              : 
     187         1720 : pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> {
     188         2975 :     for retries in 0..RETRIES {
     189         2975 :         match process_has_stopped(pid) {
     190              :             Ok(true) => {
     191         1720 :                 println!("\n{process_name} stopped");
     192         1720 :                 return Ok(());
     193              :             }
     194              :             Ok(false) => {
     195         1255 :                 if retries == NOTICE_AFTER_RETRIES {
     196            0 :                     // The process is taking a long time to start up. Keep waiting, but
     197            0 :                     // print a message
     198            0 :                     print!("\n{process_name} has not stopped yet, continuing to wait");
     199         1255 :                 }
     200         1255 :                 if retries % DOT_EVERY_RETRIES == 0 {
     201         1117 :                     print!(".");
     202         1117 :                     io::stdout().flush().unwrap();
     203         1117 :                 }
     204         1255 :                 thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
     205              :             }
     206            0 :             Err(e) => {
     207            0 :                 println!("{process_name} with pid {pid} failed to stop: {e:#}");
     208            0 :                 return Err(e);
     209              :             }
     210              :         }
     211              :     }
     212            0 :     println!();
     213            0 :     anyhow::bail!("{process_name} with pid {pid} did not stop in {RETRY_UNTIL_SECS} seconds");
     214         1720 : }
     215              : 
     216         1096 : fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
     217         1096 :     // If RUST_BACKTRACE is set, pass it through. But if it's not set, default
     218         1096 :     // to RUST_BACKTRACE=1.
     219         1096 :     let backtrace_setting = std::env::var_os("RUST_BACKTRACE");
     220         1096 :     let backtrace_setting = backtrace_setting
     221         1096 :         .as_deref()
     222         1096 :         .unwrap_or_else(|| OsStr::new("1"));
     223         1096 : 
     224         1096 :     let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
     225              : 
     226              :     // Pass through these environment variables to the command
     227         3288 :     for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
     228         3288 :         if let Some(val) = std::env::var_os(var) {
     229         1106 :             filled_cmd = filled_cmd.env(var, val);
     230         2182 :         }
     231              :     }
     232              : 
     233         1096 :     filled_cmd
     234         1096 : }
     235              : 
     236         1096 : fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command {
     237         3288 :     for env_key in [
     238              :         "AWS_ACCESS_KEY_ID",
     239         1096 :         "AWS_SECRET_ACCESS_KEY",
     240         1096 :         "AWS_SESSION_TOKEN",
     241              :     ] {
     242         3288 :         if let Ok(value) = std::env::var(env_key) {
     243         2192 :             cmd = cmd.env(env_key, value);
     244         2192 :         }
     245              :     }
     246         1096 :     cmd
     247         1096 : }
     248              : 
     249              : /// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
     250              : /// 1. Claims a pidfile with a fcntl lock on it and
     251              : /// 2. Sets up the pidfile's file descriptor so that it (and the lock)
     252              : ///    will remain held until the cmd exits.
     253            4 : fn pre_exec_create_pidfile<P>(cmd: &mut Command, path: P) -> &mut Command
     254            4 : where
     255            4 :     P: Into<PathBuf>,
     256            4 : {
     257            4 :     let path: PathBuf = path.into();
     258            4 :     // SAFETY
     259            4 :     // pre_exec is marked unsafe because it runs between fork and exec.
     260            4 :     // Why is that dangerous in various ways?
     261            4 :     // Long answer:  https://github.com/rust-lang/rust/issues/39575
     262            4 :     // Short answer: in a multi-threaded program, other threads may have
     263            4 :     // been inside of critical sections at the time of fork. In the
     264            4 :     // original process, that was allright, assuming they protected
     265            4 :     // the critical sections appropriately, e.g., through locks.
     266            4 :     // Fork adds another process to the mix that
     267            4 :     //   1. Has a single thread T
     268            4 :     //   2. In an exact copy of the address space at the time of fork.
     269            4 :     // A variety of problems scan occur now:
     270            4 :     //   1. T tries to grab a lock that was locked at the time of fork.
     271            4 :     //      It will wait forever since in its address space, the lock
     272            4 :     //      is in state 'taken' but the thread that would unlock it is
     273            4 :     //      not there.
     274            4 :     //   2. A rust object that represented some external resource in the
     275            4 :     //      parent now got implicitly copied by the the fork, even though
     276            4 :     //      the object's type is not `Copy`. The parent program may use
     277            4 :     //      non-copyability as way to enforce unique ownership of an
     278            4 :     //      external resource in the typesystem. The fork breaks that
     279            4 :     //      assumption, as now both parent and child process have an
     280            4 :     //      owned instance of the object that represents the same
     281            4 :     //      underlying resource.
     282            4 :     // While these seem like niche problems, (1) in particular is
     283            4 :     // highly relevant. For example, `malloc()` may grab a mutex internally,
     284            4 :     // and so, if we forked while another thread was mallocing' and our
     285            4 :     // pre_exec closure allocates as well, it will block on the malloc
     286            4 :     // mutex forever
     287            4 :     //
     288            4 :     // The proper solution is to only use C library functions that are marked
     289            4 :     // "async-signal-safe": https://man7.org/linux/man-pages/man7/signal-safety.7.html
     290            4 :     //
     291            4 :     // With this specific pre_exec() closure, the non-error path doesn't allocate.
     292            4 :     // The error path uses `anyhow`, and hence does allocate.
     293            4 :     // We take our chances there, hoping that any potential disaster is constrained
     294            4 :     // to the child process (e.g., malloc has no state ourside of the child process).
     295            4 :     // Last, `expect` prints to stderr, and stdio is not async-signal-safe.
     296            4 :     // Again, we take our chances, making the same assumptions as for malloc.
     297            4 :     unsafe {
     298            4 :         cmd.pre_exec(move || {
     299            0 :             let file = pid_file::claim_for_current_process(&path).expect("claim pid file");
     300            0 :             // Remove the FD_CLOEXEC flag on the pidfile descriptor so that the pidfile
     301            0 :             // remains locked after exec.
     302            0 :             nix::fcntl::fcntl(file.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
     303            0 :                 .expect("remove FD_CLOEXEC");
     304            0 :             // Don't run drop(file), it would close the file before we actually exec.
     305            0 :             std::mem::forget(file);
     306            0 :             Ok(())
     307            4 :         });
     308            4 :     }
     309            4 :     cmd
     310            4 : }
     311              : 
     312         2205 : fn process_started<F>(
     313         2205 :     pid: Pid,
     314         2205 :     pid_file_to_check: Option<&Path>,
     315         2205 :     status_check: &F,
     316         2205 : ) -> anyhow::Result<bool>
     317         2205 : where
     318         2205 :     F: Fn() -> anyhow::Result<bool>,
     319         2205 : {
     320         2205 :     match status_check() {
     321         1096 :         Ok(true) => match pid_file_to_check {
     322         1096 :             Some(pid_file_path) => match pid_file::read(pid_file_path)? {
     323            0 :                 PidFileRead::NotExist => Ok(false),
     324         1096 :                 PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid),
     325            0 :                 PidFileRead::NotHeldByAnyProcess(_) => Ok(false),
     326              :             },
     327            0 :             None => Ok(true),
     328              :         },
     329         1109 :         Ok(false) => Ok(false),
     330            0 :         Err(e) => anyhow::bail!("process failed to start: {e}"),
     331              :     }
     332         2205 : }
     333              : 
     334              : fn process_has_stopped(pid: Pid) -> anyhow::Result<bool> {
     335         2975 :     match kill(pid, None) {
     336              :         // Process exists, keep waiting
     337         1255 :         Ok(_) => Ok(false),
     338              :         // Process not found, we're done
     339         1720 :         Err(Errno::ESRCH) => Ok(true),
     340            0 :         Err(err) => anyhow::bail!("Failed to send signal to process with pid {pid}: {err}"),
     341              :     }
     342         2975 : }
        

Generated by: LCOV version 2.1-beta