LCOV - differential code coverage report
Current view: top level - control_plane/src - background_process.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 82.1 % 229 188 41 188
Current Date: 2024-01-09 02:06:09 Functions: 43.1 % 51 22 29 22
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Spawns and kills background processes that are needed by Neon CLI.
       2                 : //! Applies common set-up such as log and pid files (if needed) to every process.
       3                 : //!
       4                 : //! Neon CLI does not run in background, so it needs to store the information about
       5                 : //! spawned processes, which it does in this module.
       6                 : //! We do that by storing the pid of the process in the "${process_name}.pid" file.
       7                 : //! The pid file can be created by the process itself
       8                 : //! (Neon storage binaries do that and also ensure that a lock is taken onto that file)
       9                 : //! or we create such file after starting the process
      10                 : //! (non-Neon binaries don't necessarily follow our pidfile conventions).
      11                 : //! The pid stored in the file is later used to stop the service.
      12                 : //!
      13                 : //! See the [`lock_file`](utils::lock_file) module for more info.
      14                 : 
      15                 : use std::ffi::OsStr;
      16                 : use std::io::Write;
      17                 : use std::os::unix::prelude::AsRawFd;
      18                 : use std::os::unix::process::CommandExt;
      19                 : use std::path::Path;
      20                 : use std::process::{Child, Command};
      21                 : use std::time::Duration;
      22                 : use std::{fs, io, thread};
      23                 : 
      24                 : use anyhow::Context;
      25                 : use camino::{Utf8Path, Utf8PathBuf};
      26                 : use nix::errno::Errno;
      27                 : use nix::fcntl::{FcntlArg, FdFlag};
      28                 : use nix::sys::signal::{kill, Signal};
      29                 : use nix::unistd::Pid;
      30                 : use utils::pid_file::{self, PidFileRead};
      31                 : 
      32                 : // These constants control the loop used to poll for process start / stop.
      33                 : //
      34                 : // The loop waits for at most 10 seconds, polling every 100 ms.
      35                 : // Once a second, it prints a dot ("."), to give the user an indication that
      36                 : // it's waiting. If the process hasn't started/stopped after 5 seconds,
      37                 : // it prints a notice that it's taking long, but keeps waiting.
      38                 : //
      39                 : const RETRY_UNTIL_SECS: u64 = 10;
      40                 : const RETRIES: u64 = (RETRY_UNTIL_SECS * 1000) / RETRY_INTERVAL_MILLIS;
      41                 : const RETRY_INTERVAL_MILLIS: u64 = 100;
      42                 : const DOT_EVERY_RETRIES: u64 = 10;
      43                 : const NOTICE_AFTER_RETRIES: u64 = 50;
      44                 : 
      45                 : /// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates
      46                 : /// it itself.
      47                 : pub enum InitialPidFile {
      48                 :     /// Create a pidfile, to allow future CLI invocations to manipulate the process.
      49                 :     Create(Utf8PathBuf),
      50                 :     /// The process will create the pidfile itself, need to wait for that event.
      51                 :     Expect(Utf8PathBuf),
      52                 : }
      53                 : 
      54                 : /// Start a background child process using the parameters given.
      55 CBC        1381 : pub async fn start_process<F, Fut, AI, A, EI>(
      56            1381 :     process_name: &str,
      57            1381 :     datadir: &Path,
      58            1381 :     command: &Path,
      59            1381 :     args: AI,
      60            1381 :     envs: EI,
      61            1381 :     initial_pid_file: InitialPidFile,
      62            1381 :     process_status_check: F,
      63            1381 : ) -> anyhow::Result<Child>
      64            1381 : where
      65            1381 :     F: Fn() -> Fut,
      66            1381 :     Fut: std::future::Future<Output = anyhow::Result<bool>>,
      67            1381 :     AI: IntoIterator<Item = A>,
      68            1381 :     A: AsRef<OsStr>,
      69            1381 :     // Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
      70            1381 :     EI: IntoIterator<Item = (String, String)>,
      71            1381 : {
      72            1381 :     let log_path = datadir.join(format!("{process_name}.log"));
      73            1381 :     let process_log_file = fs::OpenOptions::new()
      74            1381 :         .create(true)
      75            1381 :         .write(true)
      76            1381 :         .append(true)
      77            1381 :         .open(&log_path)
      78            1381 :         .with_context(|| {
      79 UBC           0 :             format!("Could not open {process_name} log file {log_path:?} for writing")
      80 CBC        1381 :         })?;
      81            1381 :     let same_file_for_stderr = process_log_file.try_clone().with_context(|| {
      82 UBC           0 :         format!("Could not reuse {process_name} log file {log_path:?} for writing stderr")
      83 CBC        1381 :     })?;
      84                 : 
      85            1381 :     let mut command = Command::new(command);
      86            1381 :     let background_command = command
      87            1381 :         .stdout(process_log_file)
      88            1381 :         .stderr(same_file_for_stderr)
      89            1381 :         .args(args);
      90            1381 :     let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
      91            1381 :     filled_cmd.envs(envs);
      92                 : 
      93            1381 :     let pid_file_to_check = match &initial_pid_file {
      94             339 :         InitialPidFile::Create(path) => {
      95             339 :             pre_exec_create_pidfile(filled_cmd, path);
      96             339 :             path
      97                 :         }
      98            1042 :         InitialPidFile::Expect(path) => path,
      99                 :     };
     100                 : 
     101            1381 :     let mut spawned_process = filled_cmd.spawn().with_context(|| {
     102 UBC           0 :         format!("Could not spawn {process_name}, see console output and log files for details.")
     103 CBC        1381 :     })?;
     104            1381 :     let pid = spawned_process.id();
     105            1381 :     let pid = Pid::from_raw(
     106            1381 :         i32::try_from(pid)
     107            1381 :             .with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?,
     108                 :     );
     109                 : 
     110            2442 :     for retries in 0..RETRIES {
     111            5317 :         match process_started(pid, pid_file_to_check, &process_status_check).await {
     112                 :             Ok(true) => {
     113            1381 :                 println!("\n{process_name} started, pid: {pid}");
     114            1381 :                 return Ok(spawned_process);
     115                 :             }
     116                 :             Ok(false) => {
     117            1061 :                 if retries == NOTICE_AFTER_RETRIES {
     118 UBC           0 :                     // The process is taking a long time to start up. Keep waiting, but
     119               0 :                     // print a message
     120               0 :                     print!("\n{process_name} has not started yet, continuing to wait");
     121 CBC        1061 :                 }
     122            1061 :                 if retries % DOT_EVERY_RETRIES == 0 {
     123            1043 :                     print!(".");
     124            1043 :                     io::stdout().flush().unwrap();
     125            1043 :                 }
     126            1061 :                 thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
     127                 :             }
     128 UBC           0 :             Err(e) => {
     129               0 :                 println!("{process_name} failed to start: {e:#}");
     130               0 :                 if let Err(e) = spawned_process.kill() {
     131               0 :                     println!("Could not stop {process_name} subprocess: {e:#}")
     132               0 :                 };
     133               0 :                 return Err(e);
     134                 :             }
     135                 :         }
     136                 :     }
     137               0 :     println!();
     138               0 :     anyhow::bail!("{process_name} did not start in {RETRY_UNTIL_SECS} seconds");
     139 CBC        1381 : }
     140                 : 
     141                 : /// Stops the process, using the pid file given. Returns Ok also if the process is already not running.
     142            1418 : pub fn stop_process(
     143            1418 :     immediate: bool,
     144            1418 :     process_name: &str,
     145            1418 :     pid_file: &Utf8Path,
     146            1418 : ) -> anyhow::Result<()> {
     147            1418 :     let pid = match pid_file::read(pid_file)
     148            1418 :         .with_context(|| format!("read pid_file {pid_file:?}"))?
     149                 :     {
     150                 :         PidFileRead::NotExist => {
     151 UBC           0 :             println!("{process_name} is already stopped: no pid file present at {pid_file:?}");
     152               0 :             return Ok(());
     153                 :         }
     154                 :         PidFileRead::NotHeldByAnyProcess(_) => {
     155                 :             // Don't try to kill according to file contents beacuse the pid might have been re-used by another process.
     156                 :             // Don't delete the file either, it can race with new pid file creation.
     157                 :             // Read `pid_file` module comment for details.
     158 CBC          48 :             println!(
     159              48 :                 "No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: {pid_file:?}"
     160              48 :             );
     161              48 :             return Ok(());
     162                 :         }
     163            1370 :         PidFileRead::LockedByOtherProcess(pid) => pid,
     164                 :     };
     165                 :     // XXX the pid could become invalid (and recycled) at any time before the kill() below.
     166                 : 
     167                 :     // send signal
     168            1370 :     let sig = if immediate {
     169            1094 :         print!("Stopping {process_name} with pid {pid} immediately..");
     170            1094 :         Signal::SIGQUIT
     171                 :     } else {
     172             276 :         print!("Stopping {process_name} with pid {pid} gracefully..");
     173             276 :         Signal::SIGTERM
     174                 :     };
     175            1370 :     io::stdout().flush().unwrap();
     176            1370 :     match kill(pid, sig) {
     177            1370 :         Ok(()) => (),
     178                 :         Err(Errno::ESRCH) => {
     179                 :             // Again, don't delete the pid file. The unlink can race with a new pid file being created.
     180 UBC           0 :             println!(
     181               0 :                 "{process_name} with pid {pid} does not exist, but a pid file {pid_file:?} was found. Likely the pid got recycled. Lucky we didn't harm anyone."
     182               0 :             );
     183               0 :             return Ok(());
     184                 :         }
     185               0 :         Err(e) => anyhow::bail!("Failed to send signal to {process_name} with pid {pid}: {e}"),
     186                 :     }
     187                 : 
     188                 :     // Wait until process is gone
     189 CBC        1370 :     wait_until_stopped(process_name, pid)?;
     190            1370 :     Ok(())
     191            1418 : }
     192                 : 
     193            1906 : pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> {
     194            3424 :     for retries in 0..RETRIES {
     195            3424 :         match process_has_stopped(pid) {
     196                 :             Ok(true) => {
     197            1906 :                 println!("\n{process_name} stopped");
     198            1906 :                 return Ok(());
     199                 :             }
     200                 :             Ok(false) => {
     201            1518 :                 if retries == NOTICE_AFTER_RETRIES {
     202 UBC           0 :                     // The process is taking a long time to start up. Keep waiting, but
     203               0 :                     // print a message
     204               0 :                     print!("\n{process_name} has not stopped yet, continuing to wait");
     205 CBC        1518 :                 }
     206            1518 :                 if retries % DOT_EVERY_RETRIES == 0 {
     207            1384 :                     print!(".");
     208            1384 :                     io::stdout().flush().unwrap();
     209            1384 :                 }
     210            1518 :                 thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
     211                 :             }
     212 UBC           0 :             Err(e) => {
     213               0 :                 println!("{process_name} with pid {pid} failed to stop: {e:#}");
     214               0 :                 return Err(e);
     215                 :             }
     216                 :         }
     217                 :     }
     218               0 :     println!();
     219               0 :     anyhow::bail!("{process_name} with pid {pid} did not stop in {RETRY_UNTIL_SECS} seconds");
     220 CBC        1906 : }
     221                 : 
     222            1381 : fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
     223            1381 :     // If RUST_BACKTRACE is set, pass it through. But if it's not set, default
     224            1381 :     // to RUST_BACKTRACE=1.
     225            1381 :     let backtrace_setting = std::env::var_os("RUST_BACKTRACE");
     226            1381 :     let backtrace_setting = backtrace_setting
     227            1381 :         .as_deref()
     228            1381 :         .unwrap_or_else(|| OsStr::new("1"));
     229            1381 : 
     230            1381 :     let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
     231                 : 
     232                 :     // Pass through these environment variables to the command
     233            4143 :     for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
     234            4143 :         if let Some(val) = std::env::var_os(var) {
     235            1393 :             filled_cmd = filled_cmd.env(var, val);
     236            2750 :         }
     237                 :     }
     238                 : 
     239            1381 :     filled_cmd
     240            1381 : }
     241                 : 
     242            1381 : fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
     243            6905 :     for env_key in [
     244                 :         "AWS_ACCESS_KEY_ID",
     245            1381 :         "AWS_SECRET_ACCESS_KEY",
     246            1381 :         "AWS_SESSION_TOKEN",
     247            1381 :         "AZURE_STORAGE_ACCOUNT",
     248            1381 :         "AZURE_STORAGE_ACCESS_KEY",
     249                 :     ] {
     250            6905 :         if let Ok(value) = std::env::var(env_key) {
     251            2762 :             cmd = cmd.env(env_key, value);
     252            4143 :         }
     253                 :     }
     254            1381 :     cmd
     255            1381 : }
     256                 : 
     257                 : /// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
     258                 : /// 1. Claims a pidfile with a fcntl lock on it and
     259                 : /// 2. Sets up the pidfile's file descriptor so that it (and the lock)
     260                 : ///    will remain held until the cmd exits.
     261             339 : fn pre_exec_create_pidfile<P>(cmd: &mut Command, path: P) -> &mut Command
     262             339 : where
     263             339 :     P: Into<Utf8PathBuf>,
     264             339 : {
     265             339 :     let path: Utf8PathBuf = path.into();
     266             339 :     // SAFETY:
     267             339 :     // pre_exec is marked unsafe because it runs between fork and exec.
     268             339 :     // Why is that dangerous in various ways?
     269             339 :     // Long answer:  https://github.com/rust-lang/rust/issues/39575
     270             339 :     // Short answer: in a multi-threaded program, other threads may have
     271             339 :     // been inside of critical sections at the time of fork. In the
     272             339 :     // original process, that was allright, assuming they protected
     273             339 :     // the critical sections appropriately, e.g., through locks.
     274             339 :     // Fork adds another process to the mix that
     275             339 :     //   1. Has a single thread T
     276             339 :     //   2. In an exact copy of the address space at the time of fork.
     277             339 :     // A variety of problems scan occur now:
     278             339 :     //   1. T tries to grab a lock that was locked at the time of fork.
     279             339 :     //      It will wait forever since in its address space, the lock
     280             339 :     //      is in state 'taken' but the thread that would unlock it is
     281             339 :     //      not there.
     282             339 :     //   2. A rust object that represented some external resource in the
     283             339 :     //      parent now got implicitly copied by the the fork, even though
     284             339 :     //      the object's type is not `Copy`. The parent program may use
     285             339 :     //      non-copyability as way to enforce unique ownership of an
     286             339 :     //      external resource in the typesystem. The fork breaks that
     287             339 :     //      assumption, as now both parent and child process have an
     288             339 :     //      owned instance of the object that represents the same
     289             339 :     //      underlying resource.
     290             339 :     // While these seem like niche problems, (1) in particular is
     291             339 :     // highly relevant. For example, `malloc()` may grab a mutex internally,
     292             339 :     // and so, if we forked while another thread was mallocing' and our
     293             339 :     // pre_exec closure allocates as well, it will block on the malloc
     294             339 :     // mutex forever
     295             339 :     //
     296             339 :     // The proper solution is to only use C library functions that are marked
     297             339 :     // "async-signal-safe": https://man7.org/linux/man-pages/man7/signal-safety.7.html
     298             339 :     //
     299             339 :     // With this specific pre_exec() closure, the non-error path doesn't allocate.
     300             339 :     // The error path uses `anyhow`, and hence does allocate.
     301             339 :     // We take our chances there, hoping that any potential disaster is constrained
     302             339 :     // to the child process (e.g., malloc has no state ourside of the child process).
     303             339 :     // Last, `expect` prints to stderr, and stdio is not async-signal-safe.
     304             339 :     // Again, we take our chances, making the same assumptions as for malloc.
     305             339 :     unsafe {
     306             339 :         cmd.pre_exec(move || {
     307 UBC           0 :             let file = pid_file::claim_for_current_process(&path).expect("claim pid file");
     308               0 :             // Remove the FD_CLOEXEC flag on the pidfile descriptor so that the pidfile
     309               0 :             // remains locked after exec.
     310               0 :             nix::fcntl::fcntl(file.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
     311               0 :                 .expect("remove FD_CLOEXEC");
     312               0 :             // Don't run drop(file), it would close the file before we actually exec.
     313               0 :             std::mem::forget(file);
     314               0 :             Ok(())
     315 CBC         339 :         });
     316             339 :     }
     317             339 :     cmd
     318             339 : }
     319                 : 
     320            2442 : async fn process_started<F, Fut>(
     321            2442 :     pid: Pid,
     322            2442 :     pid_file_to_check: &Utf8Path,
     323            2442 :     status_check: &F,
     324            2442 : ) -> anyhow::Result<bool>
     325            2442 : where
     326            2442 :     F: Fn() -> Fut,
     327            2442 :     Fut: std::future::Future<Output = anyhow::Result<bool>>,
     328            2442 : {
     329            5317 :     match status_check().await {
     330            1381 :         Ok(true) => match pid_file::read(pid_file_to_check)? {
     331 UBC           0 :             PidFileRead::NotExist => Ok(false),
     332 CBC        1381 :             PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid),
     333 UBC           0 :             PidFileRead::NotHeldByAnyProcess(_) => Ok(false),
     334                 :         },
     335 CBC        1061 :         Ok(false) => Ok(false),
     336 UBC           0 :         Err(e) => anyhow::bail!("process failed to start: {e}"),
     337                 :     }
     338 CBC        2442 : }
     339                 : 
     340            3424 : fn process_has_stopped(pid: Pid) -> anyhow::Result<bool> {
     341            3424 :     match kill(pid, None) {
     342                 :         // Process exists, keep waiting
     343            1518 :         Ok(_) => Ok(false),
     344                 :         // Process not found, we're done
     345            1906 :         Err(Errno::ESRCH) => Ok(true),
     346 UBC           0 :         Err(err) => anyhow::bail!("Failed to send signal to process with pid {pid}: {err}"),
     347                 :     }
     348 CBC        3424 : }
        

Generated by: LCOV version 2.1-beta