LCOV - code coverage report
Current view: top level - control_plane/src - background_process.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 79.8 % 242 193
Test Date: 2024-02-14 18:05:35 Functions: 33.8 % 77 26

            Line data    Source code
       1              : //! Spawns and kills background processes that are needed by Neon CLI.
       2              : //! Applies common set-up such as log and pid files (if needed) to every process.
       3              : //!
       4              : //! Neon CLI does not run in background, so it needs to store the information about
       5              : //! spawned processes, which it does in this module.
       6              : //! We do that by storing the pid of the process in the "${process_name}.pid" file.
       7              : //! The pid file can be created by the process itself
       8              : //! (Neon storage binaries do that and also ensure that a lock is taken onto that file)
       9              : //! or we create such file after starting the process
      10              : //! (non-Neon binaries don't necessarily follow our pidfile conventions).
      11              : //! The pid stored in the file is later used to stop the service.
      12              : //!
      13              : //! See the [`lock_file`](utils::lock_file) module for more info.
      14              : 
      15              : use std::ffi::OsStr;
      16              : use std::io::Write;
      17              : use std::os::unix::prelude::AsRawFd;
      18              : use std::os::unix::process::CommandExt;
      19              : use std::path::Path;
      20              : use std::process::Command;
      21              : use std::time::Duration;
      22              : use std::{fs, io, thread};
      23              : 
      24              : use anyhow::Context;
      25              : use camino::{Utf8Path, Utf8PathBuf};
      26              : use nix::errno::Errno;
      27              : use nix::fcntl::{FcntlArg, FdFlag};
      28              : use nix::sys::signal::{kill, Signal};
      29              : use nix::unistd::Pid;
      30              : use utils::pid_file::{self, PidFileRead};
      31              : 
      32              : // These constants control the loop used to poll for process start / stop.
      33              : //
      34              : // The loop waits for at most 10 seconds, polling every 100 ms.
      35              : // Once a second, it prints a dot ("."), to give the user an indication that
      36              : // it's waiting. If the process hasn't started/stopped after 5 seconds,
      37              : // it prints a notice that it's taking long, but keeps waiting.
      38              : //
      39              : const RETRY_UNTIL_SECS: u64 = 10;
      40              : const RETRIES: u64 = (RETRY_UNTIL_SECS * 1000) / RETRY_INTERVAL_MILLIS;
      41              : const RETRY_INTERVAL_MILLIS: u64 = 100;
      42              : const DOT_EVERY_RETRIES: u64 = 10;
      43              : const NOTICE_AFTER_RETRIES: u64 = 50;
      44              : 
      45              : /// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates
      46              : /// it itself.
      47              : pub enum InitialPidFile {
      48              :     /// Create a pidfile, to allow future CLI invocations to manipulate the process.
      49              :     Create(Utf8PathBuf),
      50              :     /// The process will create the pidfile itself, need to wait for that event.
      51              :     Expect(Utf8PathBuf),
      52              : }
      53              : 
      54              : /// Start a background child process using the parameters given.
      55         1874 : pub async fn start_process<F, Fut, AI, A, EI>(
      56         1874 :     process_name: &str,
      57         1874 :     datadir: &Path,
      58         1874 :     command: &Path,
      59         1874 :     args: AI,
      60         1874 :     envs: EI,
      61         1874 :     initial_pid_file: InitialPidFile,
      62         1874 :     process_status_check: F,
      63         1874 : ) -> anyhow::Result<()>
      64         1874 : where
      65         1874 :     F: Fn() -> Fut,
      66         1874 :     Fut: std::future::Future<Output = anyhow::Result<bool>>,
      67         1874 :     AI: IntoIterator<Item = A>,
      68         1874 :     A: AsRef<OsStr>,
      69         1874 :     // Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
      70         1874 :     EI: IntoIterator<Item = (String, String)>,
      71         1874 : {
      72         1874 :     let log_path = datadir.join(format!("{process_name}.log"));
      73         1874 :     let process_log_file = fs::OpenOptions::new()
      74         1874 :         .create(true)
      75         1874 :         .append(true)
      76         1874 :         .open(&log_path)
      77         1874 :         .with_context(|| {
      78            0 :             format!("Could not open {process_name} log file {log_path:?} for writing")
      79         1874 :         })?;
      80         1874 :     let same_file_for_stderr = process_log_file.try_clone().with_context(|| {
      81            0 :         format!("Could not reuse {process_name} log file {log_path:?} for writing stderr")
      82         1874 :     })?;
      83              : 
      84         1874 :     let mut command = Command::new(command);
      85         1874 :     let background_command = command
      86         1874 :         .stdout(process_log_file)
      87         1874 :         .stderr(same_file_for_stderr)
      88         1874 :         .args(args);
      89         1874 :     let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
      90         1874 :     filled_cmd.envs(envs);
      91              : 
      92         1874 :     let pid_file_to_check = match &initial_pid_file {
      93          735 :         InitialPidFile::Create(path) => {
      94          735 :             pre_exec_create_pidfile(filled_cmd, path);
      95          735 :             path
      96              :         }
      97         1139 :         InitialPidFile::Expect(path) => path,
      98              :     };
      99              : 
     100         1874 :     let spawned_process = filled_cmd.spawn().with_context(|| {
     101            0 :         format!("Could not spawn {process_name}, see console output and log files for details.")
     102         1874 :     })?;
     103         1874 :     let pid = spawned_process.id();
     104         1874 :     let pid = Pid::from_raw(
     105         1874 :         i32::try_from(pid)
     106         1874 :             .with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?,
     107              :     );
     108              :     // set up a scopeguard to kill & wait for the child in case we panic or bail below
     109         1874 :     let spawned_process = scopeguard::guard(spawned_process, |mut spawned_process| {
     110            0 :         println!("SIGKILL & wait the started process");
     111            0 :         (|| {
     112            0 :             // TODO: use another signal that can be caught by the child so it can clean up any children it spawned (e..g, walredo).
     113            0 :             spawned_process.kill().context("SIGKILL child")?;
     114            0 :             spawned_process.wait().context("wait() for child process")?;
     115            0 :             anyhow::Ok(())
     116            0 :         })()
     117            0 :         .with_context(|| format!("scopeguard kill&wait child {process_name:?}"))
     118            0 :         .unwrap();
     119         1874 :     });
     120              : 
     121         3851 :     for retries in 0..RETRIES {
     122         8512 :         match process_started(pid, pid_file_to_check, &process_status_check).await {
     123              :             Ok(true) => {
     124         1874 :                 println!("\n{process_name} started and passed status check, pid: {pid}");
     125         1874 :                 // leak the child process, it'll outlive this neon_local invocation
     126         1874 :                 drop(scopeguard::ScopeGuard::into_inner(spawned_process));
     127         1874 :                 return Ok(());
     128              :             }
     129              :             Ok(false) => {
     130         1977 :                 if retries == NOTICE_AFTER_RETRIES {
     131            0 :                     // The process is taking a long time to start up. Keep waiting, but
     132            0 :                     // print a message
     133            0 :                     print!("\n{process_name} has not started yet, continuing to wait");
     134         1977 :                 }
     135         1977 :                 if retries % DOT_EVERY_RETRIES == 0 {
     136         1872 :                     print!(".");
     137         1872 :                     io::stdout().flush().unwrap();
     138         1872 :                 }
     139         1977 :                 thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
     140              :             }
     141            0 :             Err(e) => {
     142            0 :                 println!("error starting process {process_name:?}: {e:#}");
     143            0 :                 return Err(e);
     144              :             }
     145              :         }
     146              :     }
     147            0 :     println!();
     148            0 :     anyhow::bail!(
     149            0 :         "{process_name} did not start+pass status checks within {RETRY_UNTIL_SECS} seconds"
     150            0 :     );
     151         1874 : }
     152              : 
     153              : /// Stops the process, using the pid file given. Returns Ok also if the process is already not running.
     154         1549 : pub fn stop_process(
     155         1549 :     immediate: bool,
     156         1549 :     process_name: &str,
     157         1549 :     pid_file: &Utf8Path,
     158         1549 : ) -> anyhow::Result<()> {
     159         1549 :     let pid = match pid_file::read(pid_file)
     160         1549 :         .with_context(|| format!("read pid_file {pid_file:?}"))?
     161              :     {
     162              :         PidFileRead::NotExist => {
     163            0 :             println!("{process_name} is already stopped: no pid file present at {pid_file:?}");
     164            0 :             return Ok(());
     165              :         }
     166              :         PidFileRead::NotHeldByAnyProcess(_) => {
     167              :             // Don't try to kill according to file contents beacuse the pid might have been re-used by another process.
     168              :             // Don't delete the file either, it can race with new pid file creation.
     169              :             // Read `pid_file` module comment for details.
     170           47 :             println!(
     171           47 :                 "No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: {pid_file:?}"
     172           47 :             );
     173           47 :             return Ok(());
     174              :         }
     175         1502 :         PidFileRead::LockedByOtherProcess(pid) => pid,
     176              :     };
     177              :     // XXX the pid could become invalid (and recycled) at any time before the kill() below.
     178              : 
     179              :     // send signal
     180         1502 :     let sig = if immediate {
     181         1190 :         print!("Stopping {process_name} with pid {pid} immediately..");
     182         1190 :         Signal::SIGQUIT
     183              :     } else {
     184          312 :         print!("Stopping {process_name} with pid {pid} gracefully..");
     185          312 :         Signal::SIGTERM
     186              :     };
     187         1502 :     io::stdout().flush().unwrap();
     188         1502 :     match kill(pid, sig) {
     189         1502 :         Ok(()) => (),
     190              :         Err(Errno::ESRCH) => {
     191              :             // Again, don't delete the pid file. The unlink can race with a new pid file being created.
     192            0 :             println!(
     193            0 :                 "{process_name} with pid {pid} does not exist, but a pid file {pid_file:?} was found. Likely the pid got recycled. Lucky we didn't harm anyone."
     194            0 :             );
     195            0 :             return Ok(());
     196              :         }
     197            0 :         Err(e) => anyhow::bail!("Failed to send signal to {process_name} with pid {pid}: {e}"),
     198              :     }
     199              : 
     200              :     // Wait until process is gone
     201         1502 :     wait_until_stopped(process_name, pid)?;
     202         1502 :     Ok(())
     203         1549 : }
     204              : 
     205         2074 : pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> {
     206         3791 :     for retries in 0..RETRIES {
     207         3791 :         match process_has_stopped(pid) {
     208              :             Ok(true) => {
     209         2074 :                 println!("\n{process_name} stopped");
     210         2074 :                 return Ok(());
     211              :             }
     212              :             Ok(false) => {
     213         1717 :                 if retries == NOTICE_AFTER_RETRIES {
     214            0 :                     // The process is taking a long time to start up. Keep waiting, but
     215            0 :                     // print a message
     216            0 :                     print!("\n{process_name} has not stopped yet, continuing to wait");
     217         1717 :                 }
     218         1717 :                 if retries % DOT_EVERY_RETRIES == 0 {
     219         1531 :                     print!(".");
     220         1531 :                     io::stdout().flush().unwrap();
     221         1531 :                 }
     222         1717 :                 thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
     223              :             }
     224            0 :             Err(e) => {
     225            0 :                 println!("{process_name} with pid {pid} failed to stop: {e:#}");
     226            0 :                 return Err(e);
     227              :             }
     228              :         }
     229              :     }
     230            0 :     println!();
     231            0 :     anyhow::bail!("{process_name} with pid {pid} did not stop in {RETRY_UNTIL_SECS} seconds");
     232         2074 : }
     233              : 
     234         1874 : fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
     235         1874 :     // If RUST_BACKTRACE is set, pass it through. But if it's not set, default
     236         1874 :     // to RUST_BACKTRACE=1.
     237         1874 :     let backtrace_setting = std::env::var_os("RUST_BACKTRACE");
     238         1874 :     let backtrace_setting = backtrace_setting
     239         1874 :         .as_deref()
     240         1874 :         .unwrap_or_else(|| OsStr::new("1"));
     241         1874 : 
     242         1874 :     let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
     243              : 
     244              :     // Pass through these environment variables to the command
     245         5622 :     for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
     246         5622 :         if let Some(val) = std::env::var_os(var) {
     247         1882 :             filled_cmd = filled_cmd.env(var, val);
     248         3740 :         }
     249              :     }
     250              : 
     251         1874 :     filled_cmd
     252         1874 : }
     253              : 
     254         1874 : fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
     255        11244 :     for env_key in [
     256              :         "AWS_ACCESS_KEY_ID",
     257         1874 :         "AWS_SECRET_ACCESS_KEY",
     258         1874 :         "AWS_PROFILE",
     259         1874 :         // HOME is needed in combination with `AWS_PROFILE` to pick up the SSO sessions.
     260         1874 :         "HOME",
     261         1874 :         "AZURE_STORAGE_ACCOUNT",
     262         1874 :         "AZURE_STORAGE_ACCESS_KEY",
     263              :     ] {
     264        11244 :         if let Ok(value) = std::env::var(env_key) {
     265         5622 :             cmd = cmd.env(env_key, value);
     266         5622 :         }
     267              :     }
     268         1874 :     cmd
     269         1874 : }
     270              : 
     271              : /// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
     272              : /// 1. Claims a pidfile with a fcntl lock on it and
     273              : /// 2. Sets up the pidfile's file descriptor so that it (and the lock)
     274              : ///    will remain held until the cmd exits.
     275          735 : fn pre_exec_create_pidfile<P>(cmd: &mut Command, path: P) -> &mut Command
     276          735 : where
     277          735 :     P: Into<Utf8PathBuf>,
     278          735 : {
     279          735 :     let path: Utf8PathBuf = path.into();
     280          735 :     // SAFETY:
     281          735 :     // pre_exec is marked unsafe because it runs between fork and exec.
     282          735 :     // Why is that dangerous in various ways?
     283          735 :     // Long answer:  https://github.com/rust-lang/rust/issues/39575
     284          735 :     // Short answer: in a multi-threaded program, other threads may have
     285          735 :     // been inside of critical sections at the time of fork. In the
     286          735 :     // original process, that was allright, assuming they protected
     287          735 :     // the critical sections appropriately, e.g., through locks.
     288          735 :     // Fork adds another process to the mix that
     289          735 :     //   1. Has a single thread T
     290          735 :     //   2. In an exact copy of the address space at the time of fork.
     291          735 :     // A variety of problems scan occur now:
     292          735 :     //   1. T tries to grab a lock that was locked at the time of fork.
     293          735 :     //      It will wait forever since in its address space, the lock
     294          735 :     //      is in state 'taken' but the thread that would unlock it is
     295          735 :     //      not there.
     296          735 :     //   2. A rust object that represented some external resource in the
     297          735 :     //      parent now got implicitly copied by the the fork, even though
     298          735 :     //      the object's type is not `Copy`. The parent program may use
     299          735 :     //      non-copyability as way to enforce unique ownership of an
     300          735 :     //      external resource in the typesystem. The fork breaks that
     301          735 :     //      assumption, as now both parent and child process have an
     302          735 :     //      owned instance of the object that represents the same
     303          735 :     //      underlying resource.
     304          735 :     // While these seem like niche problems, (1) in particular is
     305          735 :     // highly relevant. For example, `malloc()` may grab a mutex internally,
     306          735 :     // and so, if we forked while another thread was mallocing' and our
     307          735 :     // pre_exec closure allocates as well, it will block on the malloc
     308          735 :     // mutex forever
     309          735 :     //
     310          735 :     // The proper solution is to only use C library functions that are marked
     311          735 :     // "async-signal-safe": https://man7.org/linux/man-pages/man7/signal-safety.7.html
     312          735 :     //
     313          735 :     // With this specific pre_exec() closure, the non-error path doesn't allocate.
     314          735 :     // The error path uses `anyhow`, and hence does allocate.
     315          735 :     // We take our chances there, hoping that any potential disaster is constrained
     316          735 :     // to the child process (e.g., malloc has no state ourside of the child process).
     317          735 :     // Last, `expect` prints to stderr, and stdio is not async-signal-safe.
     318          735 :     // Again, we take our chances, making the same assumptions as for malloc.
     319          735 :     unsafe {
     320          735 :         cmd.pre_exec(move || {
     321            0 :             let file = pid_file::claim_for_current_process(&path).expect("claim pid file");
     322            0 :             // Remove the FD_CLOEXEC flag on the pidfile descriptor so that the pidfile
     323            0 :             // remains locked after exec.
     324            0 :             nix::fcntl::fcntl(file.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
     325            0 :                 .expect("remove FD_CLOEXEC");
     326            0 :             // Don't run drop(file), it would close the file before we actually exec.
     327            0 :             std::mem::forget(file);
     328            0 :             Ok(())
     329          735 :         });
     330          735 :     }
     331          735 :     cmd
     332          735 : }
     333              : 
     334         3851 : async fn process_started<F, Fut>(
     335         3851 :     pid: Pid,
     336         3851 :     pid_file_to_check: &Utf8Path,
     337         3851 :     status_check: &F,
     338         3851 : ) -> anyhow::Result<bool>
     339         3851 : where
     340         3851 :     F: Fn() -> Fut,
     341         3851 :     Fut: std::future::Future<Output = anyhow::Result<bool>>,
     342         3851 : {
     343         8512 :     match status_check().await {
     344         1874 :         Ok(true) => match pid_file::read(pid_file_to_check)? {
     345            0 :             PidFileRead::NotExist => Ok(false),
     346         1874 :             PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid),
     347            0 :             PidFileRead::NotHeldByAnyProcess(_) => Ok(false),
     348              :         },
     349         1977 :         Ok(false) => Ok(false),
     350            0 :         Err(e) => anyhow::bail!("process failed to start: {e}"),
     351              :     }
     352         3851 : }
     353              : 
     354         3791 : fn process_has_stopped(pid: Pid) -> anyhow::Result<bool> {
     355         3791 :     match kill(pid, None) {
     356              :         // Process exists, keep waiting
     357         1717 :         Ok(_) => Ok(false),
     358              :         // Process not found, we're done
     359         2074 :         Err(Errno::ESRCH) => Ok(true),
     360            0 :         Err(err) => anyhow::bail!("Failed to send signal to process with pid {pid}: {err}"),
     361              :     }
     362         3791 : }
        

Generated by: LCOV version 2.1-beta